阻塞隊列是 java.util.concurrent 包提供的一個類,該類提供了多線程中通過隊列實現安全高效的數據處理的功能。
所謂阻塞隊列,是在普通隊列基礎上實現了阻塞線程的功能:
- 隊列為空時,獲取元素的線程阻塞,直到隊列變為非空。
- 當隊列滿時,存儲元素的線程阻塞,直到隊列可用(非滿)。
以下是阻塞隊列實現阻塞線程的兩種常用場景:
阻塞隊列提供的方法:
插入方法:
1. boolean add(E e):隊列沒有滿,則插入數據並返回true;隊列滿時,拋出異常 java.lang.IllegalStateException: Queue full。
2. boolean offer(E e):隊列沒有滿,則插入數據並返回true;隊列滿時,返回false。
3. void put(E e):隊列沒有滿,則插入數據;隊列滿時,阻塞調用此方法線程,直到隊列有空閑空間時此線程進入就緒狀態。
4. boolean offer(E e, long timeout, TimeUnit unit):隊列沒有滿,插入數據並返回true;隊列滿時,阻塞調用此方法線程,若指定等待的時間內還不能往隊列中插入數據,返回false。
移除方法:
1. E remove():隊列非空,則以FIFO原則移除數據,並返回該數據的值;隊列為空,拋出異常 java.util.NoSuchElementException。
2. E poll():隊列非空,移除數據,並返回該數據的值;隊列為空,返回null。
3. E take():隊列非空,移除數據,並返回該數據的值;隊列為空,阻塞調用此方法線程,直到隊列為非空時此線程進入就緒狀態。
4. E poll(long timeout, TimeUnit unit):隊列非空,移除數據,並返回該數據的值;隊列為空,阻塞調用此方法線程,若指定等待的時間內隊列都沒有數據可取,返回null。
檢查方法:
1. E element():隊列非空,則返回隊首元素;隊列為空,拋出異常 java.util.NoSuchElementException。
2. E peek():隊列非空,則返回隊首元素;隊列為空,返回null。
獲取所有成員的方法:
1. int drainTo(Collection<? super E> c):一次性從BlockingQueue獲取所有可用的數據對象存入集合中。
2. int drainTo(Collection<? super E> c, int maxElements):從BlockingQueue獲取指定數據的個數的對象存入集合中。
JDK提供的阻塞隊列:
1. ArrayBlockingQueue :一個由數組結構實現的有界阻塞隊列。
ArrayBlockingQueue內部,維護了一個定長數組。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下對象內部采用非公平鎖,所謂公平鎖是指阻塞的所有生產者線程(或消費者線程),當隊列可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產者線程,可以先往隊列里插入元素(先阻塞的消費者線程,可以先從隊列里獲取元素)。通常情況下為了保證公平性會降低吞吐量。
2. LinkedBlockingQueue :一個由鏈表結構實現的有界阻塞隊列。
LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為Integer.MAX_VALUE。按照先進先出的原則對元素進行排序。
3. DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。使用場景:常見的例子比如使用一個DelayQueue來管理一個超時未響應的連接隊列。
4. PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
PriorityBlockingQueue是一個支持優先級的無界隊列。默認情況下元素采取自然順序排列,也可以通過比較器comparator來指定元素的排序規則。元素按照升序排列。
5. SynchronousQueue:一個不存儲元素的阻塞隊列。
SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。
6. LinkedTransferQueue:一個由鏈表結構實現的無界阻塞隊列。
LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其他阻塞隊列LinkedTransferQueue多了tryTransfer和transfer方法。
7. LinkedBlockingDeque:一個由鏈表結構實現的雙向阻塞隊列。
LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。
阻塞隊列常用場景是 “生產者—消費者” 模式,以下是一個生產者不斷生產隨機數據存入隊列,消費者不斷獲取的實例:
import java.util.Random; import java.util.concurrent.*; public class BlockingQueueTest { /** * 生產者 */ public static class Producer implements Runnable { /** * 阻塞隊列 */ private BlockingQueue<Integer> blockingQueue; /** * 判斷是否循環 */ private boolean isRunning = true; /** * 隨機數據范圍 */ private static final int RANGE_FOR_DATA = 1000; private Random random = new Random(); public Producer(BlockingQueue<Integer> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (isRunning) { try { /** 生產出隨機數 */ int data = random.nextInt(RANGE_FOR_DATA); System.out.println(Thread.currentThread().getName() + " 生產數據:" + data); /** 將隨機數放入阻塞隊列 */ blockingQueue.put(data); System.out.println(Thread.currentThread().getName() + " 插入隊列:" + data); /** 進行隨機時間休眠 */ Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { System.out.println("程序結束啦,我不用再等待阻塞隊列有空余位置了!"); } } } /** * 終止生產線程 */ public void shutDown() { isRunning = false; } } /** * 消費者 */ public static class Consumer implements Runnable { /** * 阻塞隊列 */ private BlockingQueue<Integer> blockingQueue; /** * 判斷是否循環 */ private boolean isRunning = true; /** * 隨機數據范圍 */ private Random random = new Random(); public Consumer(BlockingQueue<Integer> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (isRunning) { try { /** 從阻塞隊列中獲取隨機數 */ int data = (int) blockingQueue.take(); System.out.println(Thread.currentThread().getName() + " 消費數據:" + data); /** 進行隨機時間休眠 */ Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { System.out.println("程序結束啦,我不用再等待阻塞隊列非空了!"); } } } /** * 終止消費線程 */ public void shutDown() { isRunning = false; } } public static void main(String[] args) { /** 創建容量大小為5的阻塞隊列 */ BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(5); /** 創建連接池 */ ExecutorService pool = Executors.newCachedThreadPool(); /** 創建生產線程,消費線程各5個 */ Producer[] producers = new Producer[5]; Consumer[] consumers = new Consumer[5]; /** 實例化生產線程與消費線程並且執行線程 */ for (int i = 0; i < producers.length; i++) { producers[i] = new Producer(blockingQueue); consumers[i] = new Consumer(blockingQueue); pool.execute(producers[i]); pool.execute(consumers[i]); } try { /** 等待5秒后進行手動中斷 */ Thread.sleep(5 * 1000); for (int i = 0; i < producers.length; i++) { producers[i].shutDown(); consumers[i].shutDown(); } /** 其實提不提醒線程關閉都一個樣了,阻塞的線程,不會因為手動中斷而中斷的 */ pool.shutdown(); /** 等待2秒,若還有線程沒有關閉則強行中斷所有等待線程 */ if (!pool.awaitTermination(2 * 1000, TimeUnit.MILLISECONDS)) { /** 超時的時候向線程池中所有的線程發出中斷 */ pool.shutdownNow(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
以下是對火車售票系統的模擬,在火車票發售的短短幾分鍾會有眾多的人進行下單,這就是一個處理並發的問題。我們可以將用戶請求存入阻塞隊列,然后再對這些請求一一處理。
import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.*; public class TrainTicketingSystem { /** * 用戶線程數量,同時進行搶票 */ private static final int USER_THREAD_COUNT = 300; /** * 火車票數量 */ private static final int TICKET_COUNT = 500; /** * 用戶搶票線程,將用戶名放入阻塞隊列等待系統處理。 */ public static class User implements Runnable { /** * 用戶名 */ private String username; /** * 火車票 */ private Ticket ticket; private BlockingQueue<String> blockingQueue; private Random random = new Random(); public User(String username, BlockingQueue<String> blockingQueue) { this.username = username; this.blockingQueue = blockingQueue; } public void setUsername(String username) { this.username = username; } public String getUsername() { return username; } public void setTicket(Ticket ticket) { this.ticket = ticket; } public Ticket getTicket() { return ticket; } @Override public void run() { try { /** 在休眠1s以內后開始搶票 */ Thread.sleep(random.nextInt(1000)); System.out.println(username + "開始搶票"); /** 將用戶名放入阻塞隊列 */ blockingQueue.put(username); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 火車票,只是一個POJO */ public static class Ticket { /** * 火車票編號 */ private String tid; /** * 用戶名 */ private String username; public Ticket(String tid) { this.tid = tid; } public String getTid() { return tid; } public void setTid(String tid) { this.tid = tid; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } } /** * 售票線程,取出阻塞隊列中的用戶名,將火車票賣給用戶名對應的用戶 */ public static class SellTicket implements Runnable { /** * 火車票組 */ private Ticket[] tickets; /** * 阻塞隊列 */ private BlockingQueue<String> blockingQueue; public SellTicket(Ticket[] tickets, BlockingQueue<String> blockingQueue) { this.tickets = tickets; this.blockingQueue = blockingQueue; } @Override public void run() { for (int i = 0; i < tickets.length; i++) { try { String username = blockingQueue.take(); tickets[i].setUsername(username); System.out.println(username + " 搶票成功,火車票編號:" + tickets[i].getTid()); } catch (InterruptedException e) { System.out.println("售票時間截止!"); break; } } } } public static void main(String[] args) { /** 存放用戶線程的數組 */ User[] users = new User[USER_THREAD_COUNT]; /** 存放火車票的數組 */ Ticket[] tickets = new Ticket[TICKET_COUNT]; /** 鏈表存儲結構的無界阻塞隊列 */ BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(); /** 連接池 */ ExecutorService pool = Executors.newCachedThreadPool(); /** 創建火車票實例 */ for (int i = 0; i < tickets.length; i++) { tickets[i] = new Ticket("#00000" + String.valueOf(i)); } /** 創建並啟動用戶線程組中的線程 */ for (int i = 0; i < users.length; i++) { users[i] = new User("user" + String.valueOf(i), blockingQueue); pool.execute(users[i]); } /** 創建並啟動售賣火車票線程 */ pool.execute(new SellTicket(tickets, blockingQueue)); try { /** 使用集合將阻塞隊列中的所有元素存入,統計搶票失敗的人數 */ Thread.sleep(4 * 1000); List<String> list = new ArrayList<String>(); int count = blockingQueue.drainTo(list); System.out.println("************ 一共有:" + count + "個人沒有搶到票 ************"); list.forEach((s) -> System.out.println("*** 用戶" + s + "搶票失敗 ***")); /** 中斷所有運行的線程,由於是無界阻塞隊列,就是中斷售票線程,意思是停止售票 */ pool.shutdown(); if (!pool.awaitTermination(3 * 1000, TimeUnit.MILLISECONDS)) { /** 超時的時候向線程池中所有的線程發出中斷 */ pool.shutdownNow(); } try { Thread.sleep(4 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { /** 將火車票傳給對應的用戶 */ for (int i = 0; i < tickets.length; i++) { /** 若是淡季,火車票不會被賣完,火車票對應的用戶名就為空 */ if (tickets[i].getUsername() != null) { int id = Integer.parseInt(tickets[i].getUsername().substring(4)); users[id].setTicket(tickets[i]); } } /** 將用戶信息依次輸出 */ System.out.println("************ 以下是所有用戶信息 ************"); for (int i = 0; i < users.length; i++) { /** 若是旺季,很多用戶沒有搶到票ticket就為空 */ if (users[i].getTicket() != null) { System.out.println("*** 火車票編號:" + users[i].getTicket().getTid() + " ***"); System.out.println("*** 用戶名: " + users[i].getTicket().getUsername() + " ***"); } else { System.out.println("*** 火車票編號:" + "#####" + " ***"); System.out.println("*** 用戶名: " + users[i].getUsername() + " ***"); } } } } }