線程池阻塞隊列之LinkedBlockingQueue


LinkedBlockingQueue介紹

LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列。該隊列按 FIFO排序元素,新元素插入到隊列的尾部,並且隊列獲取操作會獲得位於隊列頭部的元素。

此外,LinkedBlockingQueue可以指定隊列的容量。如果不指定,默認容量大小等於Integer.MAX_VALUE。

LinkedBlockingQueue原理和數據結構

  1. LinkedBlockingQueue繼承於AbstractQueue,它本質上是一個FIFO(先進先出)的隊列。

  2. LinkedBlockingQueue實現了BlockingQueue接口,它支持多線程並發。當多線程競爭同一個資源時,某線程獲取到該資源之后,其它線程需要阻塞等待。

  3. LinkedBlockingQueue是通過單鏈表實現的:

      1. head是鏈表的表頭。取出數據時,都是從表頭head處取出。

      2. last是鏈表的表尾。新增數據時,都是從表尾last處插入。

      3. count是鏈表的實際大小,即當前鏈表中包含的節點個數。

      4. capacity是列表的容量,它是在創建鏈表時指定的。

      5. putLock是插入鎖,takeLock是取出鎖;notEmpty是“非空條件”,notFull是“未滿條件”。通過它們對鏈表進行並發控制。LinkedBlockingQueue在實現“多線程對競爭資源的互斥訪問”時,對於“插入”和“取出(刪除)”操作分別使用了不同的鎖。對於插入操作,通過“插入鎖putLock”進行同步;對於取出操作,通過“取出鎖takeLock”進行同步。此外,插入鎖putLock和“非滿條件notFull”相關聯,取出鎖takeLock和“非空條件notEmpty”相關聯。通過notFull和notEmpty更細膩的控制鎖。

若某線程(線程A)要取出數據時,隊列正好為空,則該線程會執行notEmpty.await()進行等待;當其它某個線程(線程B)向隊列中插入了數據之后,會調用notEmpty.signal()喚醒“notEmpty上的等待線程”。此時,線程A會被喚醒從而得以繼續運行。 此外,線程A在執行取操作前,會獲取takeLock,在取操作執行完畢再釋放takeLock。

若某線程(線程H)要插入數據時,隊列已滿,則該線程會它執行notFull.await()進行等待;當其它某個線程(線程I)取出數據之后,會調用notFull.signal()喚醒“notFull上的等待線程”。此時,線程H就會被喚醒從而得以繼續運行。 此外,線程H在執行插入操作前,會獲取putLock,在插入操作執行完畢才釋放putLock。

LinkedBlockingQueue函數列表

 

LinkedBlockingQueue源碼分析

下面從LinkedBlockingQueue的創建,添加,刪除,遍歷這幾個方面對它進行分析。

1. 創建

下面以LinkedBlockingQueue(int capacity)來進行說明。

說明:

  1. capacity是“LinkedBlockingQueue”的容量。

  1. head和last是“LinkedBlockingQueue”的首節點和尾節點。它們在LinkedBlockingQueue中的聲明如下:

 

 

鏈表的節點定義如下:

 

 2. 添加

下面以offer(E e)為例,對LinkedBlockingQueue的添加方法進行說明。

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    // 如果“隊列已滿”,則返回false,表示插入失敗。
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    // 新建“節點e”
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    // 獲取“插入鎖putLock”
    putLock.lock();
    try {
        // 再次對“隊列是不是滿”的進行判斷。
        // 若“隊列未滿”,則插入節點。
        if (count.get() < capacity) {
            // 插入節點
            enqueue(node);
            // 將“當前節點數量”+1,並返回“原始的數量”
            c = count.getAndIncrement();
            // 如果在插入元素之后,隊列仍然未滿,則喚醒notFull上的等待線程。
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        // 釋放“插入鎖putLock”
        putLock.unlock();
    }
    // 如果在插入節點前,隊列為空;則插入節點后,喚醒notEmpty上的等待線程
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

說明:offer()的作用很簡單,就是將元素E添加到隊列的末尾。 enqueue()的源碼如下:

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

enqueue()的作用是將node添加到隊列末尾,並設置node為新的尾節點! signalNotEmpty()的源碼如下:

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

signalNotEmpty()的作用是喚醒notEmpty上的等待線程。

3. 取出

下面以take()為例,對LinkedBlockingQueue的取出方法進行說明。

說明:take()的作用是取出並返回隊列的頭。若隊列為空,則一直等待。 dequeue()的源碼如下:

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

dequeue()的作用就是刪除隊列的頭節點,並將表頭指向“原頭節點的下一個節點”。 signalNotFull()的源碼如下:

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

signalNotFull()的作用就是喚醒notFull上的等待線程。

4. 遍歷

下面對LinkedBlockingQueue的遍歷方法進行說明。

public Iterator<E> iterator() {
  return new Itr();
}

iterator()實際上是返回一個Iter對象。 Itr類的定義如下:

private class Itr implements Iterator<E> {
    // 當前節點
    private Node<E> current;
    // 上一次返回的節點
    private Node<E> lastRet;
    // 當前節點對應的值
    private E currentElement;
​
    Itr() {
        // 同時獲取“插入鎖putLock” 和 “取出鎖takeLock”
        fullyLock();
        try {
            // 設置“當前元素”為“隊列表頭的下一節點”,即為隊列的第一個有效節點
            current = head.next;
            if (current != null)
                currentElement = current.item;
        } finally {
            // 釋放“插入鎖putLock” 和 “取出鎖takeLock”
            fullyUnlock();
        }
    }
​
    // 返回“下一個節點是否為null”
    public boolean hasNext() {
        return current != null;
    }
​
    private Node<E> nextNode(Node<E> p) {
        for (;;) {
            Node<E> s = p.next;
            if (s == p)
                return head.next;
            if (s == null || s.item != null)
                return s;
            p = s;
        }
    }
​
    // 返回下一個節點
    public E next() {
        fullyLock();
        try {
            if (current == null)
                throw new NoSuchElementException();
            E x = currentElement;
            lastRet = current;
            current = nextNode(current);
            currentElement = (current == null) ? null : current.item;
            return x;
        } finally {
            fullyUnlock();
        }
    }
​
    // 刪除下一個節點
    public void remove() {
        if (lastRet == null)
            throw new IllegalStateException();
        fullyLock();
        try {
            Node<E> node = lastRet;
            lastRet = null;
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (p == node) {
                    unlink(p, trail);
                    break;
                }
            }
        } finally {
            fullyUnlock();
        }
    }
}

 

LinkedBlockingQueue示例

import java.util.*;
import java.util.concurrent.*;

/*
 *   LinkedBlockingQueue是“線程安全”的隊列,而LinkedList是非線程安全的。
 *
 *   下面是“多個線程同時操作並且遍歷queue”的示例
 *   (01) 當queue是LinkedBlockingQueue對象時,程序能正常運行。
 *   (02) 當queue是LinkedList對象時,程序會產生ConcurrentModificationException異常。
 *
 */
public class LinkedBlockingQueueDemo1 {

    // TODO: queue是LinkedList對象時,程序會出錯。
    //private static Queue<String> queue = new LinkedList<>();
    private static Queue<String> queue = new LinkedBlockingQueue<>();
    public static void main(String[] args) {
    
        // 同時啟動兩個線程對queue進行操作!
        new MyThread("ta").start();
        new MyThread("tb").start();
    }

    private static void printAll() {
        String value;
        Iterator iter = queue.iterator();
        while(iter.hasNext()) {
            value = (String)iter.next();
            System.out.print(value+", ");
        }
        System.out.println();
    }

    private static class MyThread extends Thread {
        MyThread(String name) {
            super(name);
        }
        @Override
        public void run() {
                int i = 0;
            while (i++ < 6) {
                // “線程名” + "-" + "序號"
                String val = Thread.currentThread().getName()+i;
                queue.add(val);
                // 通過“Iterator”遍歷queue。
                printAll();
            }
        }
    }
}

其中一次運行結果:

tb1, ta1, 
tb1, ta1, ta2, 
tb1, ta1, ta2, ta3, 
tb1, ta1, ta2, ta3, ta4, 
tb1, ta1, tb1, ta2, ta1, ta3, ta2, ta4, ta3, ta5, 
ta4, tb1, ta5, ta1, ta6, 
ta2, tb1, ta3, ta1, ta4, ta2, ta5, ta3, ta6, ta4, tb2, 
ta5, ta6, tb2, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, tb6,

結果說明: 示例程序中,啟動兩個線程(線程ta和線程tb)分別對LinkedBlockingQueue進行操作。以線程ta而言,它會先獲取“線程名”+“序號”,然后將該字符串添加到LinkedBlockingQueue中;接着,遍歷並輸出LinkedBlockingQueue中的全部元素。 線程tb的操作和線程ta一樣,只不過線程tb的名字和線程ta的名字不同。 當queue是LinkedBlockingQueue對象時,程序能正常運行。如果將queue改為LinkedList時,程序會產生ConcurrentModificationException異常。

 

參考:https://github.com/wangzhiwubigdata/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM