並發編程之並發隊列


一、並發隊列

在並發隊列上JDK提供了兩套實現,

一個是以ConcurrentLinkedQueue為代表的高性能隊列非阻塞,

一個是以BlockingQueue接口為代表的阻塞隊列,無論哪種都繼承自Queue。

1、阻塞隊列與非阻塞隊

阻塞隊列與普通隊列的區別在於:

阻塞隊列:

  • 當隊列是空的時,從隊列中獲取元素的操作將會被阻塞,試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其他的線程往空的隊列插入新的元素;
  • 當隊列是滿時,往隊列里添加元素的操作會被阻塞。試圖往已滿的阻塞隊列中添加新元素的線程同樣也會被阻塞,直到其他的線程使隊列重新變得空閑起來,如從隊列中移除一個或者多個元素,或者完全清空隊列.

2、ConcurrentLinkedQeque

ConcurrentLinkedQueue : 是一個適用於高並發場景下的隊列,通過無鎖的方式,實現
了高並發狀態下的高性能,通常ConcurrentLinkedQueue性能好於BlockingQueue.它
是一個基於鏈接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最先
加入的,尾是最近加入的,該隊列不允許null元素。

// 非阻塞式隊列,無界隊列
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
	q.offer("張三");
	q.offer("李四");
	q.offer("王五");
	//從頭獲取元素,刪除該元素
	System.out.println(q.poll());
	//從頭獲取元素,不刪除該元素
	System.out.println(q.peek());
	//獲取總長度
	System.out.println(q.size());

3、BlockingQueue

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:

  • 在隊列為空時,獲取元素的線程會等待隊列變為非空。
  • 當隊列滿時,存儲元素的線程會等待隊列可用。

在Java中,BlockingQueue的接口位於java.util.concurrent 包中(在Java5版本開始提供),由上面介紹的阻塞隊列的特性可知,阻塞隊列是線程安全的。

1)、ArrayBlockingQueue

ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。

ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面
是一個初始化和使用ArrayBlockingQueue的例子:

<String> arrays = new ArrayBlockingQueue<String>(3);
	arrays.offer("張三");
	 arrays.offer("李四");
	arrays.offer("王五");
	arrays.offer("666", 3, TimeUnit.SECONDS); // 隊列滿了,阻塞3秒后向下執行
	System.out.println(arrays.poll()); // 張三
	System.out.println(arrays.poll()); // 李四
	System.out.println(arrays.poll()); // 王五
	System.out.println(arrays.poll(3, TimeUnit.SECONDS)); //隊列為空,阻塞3秒后結束

2)、LinkedBlockingQueue

LinkedBlockingQueue阻塞隊列大小的配置是可選的,如果我們初始化時指定一個大小,它就是有邊界的,如果不指定,它就是無邊界的。說是無邊界,其實是采用了默認大小為Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表。

和ArrayBlockingQueue一樣,LinkedBlockingQueue 也是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面是一個初始化和使LinkedBlockingQueue的例子:

LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);
linkedBlockingQueue.add("張三");
linkedBlockingQueue.add("李四");
linkedBlockingQueue.add("李四");
System.out.println(linkedBlockingQueue.size()); // 3

3)、PriorityBlockingQueue(有界,快滿時自動擴容,看似無界)

PriorityBlockingQueue是一個沒有邊界的隊列,它的排序規則和 java.util.PriorityQueue一樣。需要注意,PriorityBlockingQueue中允許插入null對象。

所有插入PriorityBlockingQueue的對象必須實現 java.lang.Comparable接口,隊列優先級的排序規則就
是按照我們對這個接口的實現來定義的。

另外,我們可以從PriorityBlockingQueue獲得一個迭代器Iterator,但這個迭代器並不保證按照優先級順序進行迭代。

4)、SynchronousQueue

SynchronousQueue隊列內部僅允許容納一個元素。當一個線程插入一個元素后會被阻塞,除非這個元素被另一個線程消費。

5)、使用BlockingQueue模擬生產者與消費者

class ProducerThread implements Runnable {
	private BlockingQueue<String> blockingQueue;
	private AtomicInteger count = new AtomicInteger();
	private volatile boolean FLAG = true;

	public ProducerThread(BlockingQueue<String> blockingQueue) {
		this.blockingQueue = blockingQueue;
	}

	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName() + "生產者開始啟動....");
		while (FLAG) {
			String data = count.incrementAndGet() + "";
			try {
				boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
				if (offer) {
					System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "成功..");
				} else {
					System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "失敗..");
				}
				Thread.sleep(1000);
			} catch (Exception e) {

			}
		}
		System.out.println(Thread.currentThread().getName() + ",生產者線程停止...");
	}

	public void stop() {
		this.FLAG = false;
	}

}

class ConsumerThread implements Runnable {
	private volatile boolean FLAG = true;
	private BlockingQueue<String> blockingQueue;

	public ConsumerThread(BlockingQueue<String> blockingQueue) {
		this.blockingQueue = blockingQueue;
	}

	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName() + "消費者開始啟動....");
		while (FLAG) {
			try {
				String data = blockingQueue.poll(2, TimeUnit.SECONDS);
				if (data == null || data == "") {
					FLAG = false;
					System.out.println("消費者超過2秒時間未獲取到消息.");
					return;
				}
				System.out.println("消費者獲取到隊列信息成功,data:" + data);

			} catch (Exception e) {
				// TODO: handle exception
			}
		}
	}

}

public class Test0008 {

	public static void main(String[] args) {
		BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
		ProducerThread producerThread = new ProducerThread(blockingQueue);
		ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
		Thread t1 = new Thread(producerThread);
		Thread t2 = new Thread(consumerThread);
		t1.start();
		t2.start();
		//10秒后 停止線程..
		try {
			Thread.sleep(10*1000);
			producerThread.stop();
		} catch (Exception e) {
			// TODO: handle exception
		}
	}

}

  1. ArrayDeque, (數組雙端隊列)
  2. PriorityQueue, (優先級隊列)
  3. ConcurrentLinkedQueue, (基於鏈表的並發隊列)
  4. DelayQueue, (延期阻塞隊列)(阻塞隊列實現了BlockingQueue接口)
  5. ArrayBlockingQueue, 常用(基於數組的並發阻塞隊列)
  6. LinkedBlockingQueue, 常用(基於鏈表的FIFO阻塞隊列)
  7. LinkedBlockingDeque, (基於鏈表的FIFO雙端阻塞隊列)
  8. PriorityBlockingQueue,常用 (帶優先級的無界阻塞隊列,)
  9. SynchronousQueue常用 (並發同步阻塞隊列)

本文由博客一文多發平台 OpenWrite 發布!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM