1 api
java.util.concurrent包下的新類。LinkedBlockingQueue就是其中之一,是一個阻塞的線程安全的隊列,底層采用鏈表實現。
LinkedBlockingQueue構造的時候若沒有指定大小,則默認大小為Integer.MAX_VALUE,當然也可以在構造函數的參數中指定大小。
LinkedBlockingQueue不接受null。
添加元素的方法有三個:add,put,offer,且這三個元素都是向隊列尾部添加元素的意思。
區別:
add方法在添加元素的時候,若超出了度列的長度會直接拋出異常:
xxxxxxxxxx
16
16
1
public static void main(String args[]){
2
try {
3
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);
4
5
queue.add("hello");
6
queue.add("world");
7
queue.add("yes");
8
} catch (Exception e) {
9
// TODO: handle exception
10
e.printStackTrace();
11
}
12
}
13
//運行結果:
14
java.lang.IllegalStateException: Queue full
15
at java.util.AbstractQueue.add(Unknown Source)
16
at com.wjy.test.GrandPather.main(GrandPather.java:12)
xxxxxxxxxx
17
17
1
public static void main(String args[]){
2
try {
3
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);
4
5
queue.put("hello");
6
queue.put("world");
7
queue.put("yes");
8
9
System.out.println("yes");
10
} catch (Exception e) {
11
// TODO: handle exception
12
e.printStackTrace();
13
}
14
}
15
//運行結果:
16
//在queue.put("yes")處發生阻塞
17
//下面的“yes”無法輸出
offer方法在添加元素時,如果發現隊列已滿無法添加的話,會直接返回false。
xxxxxxxxxx
22
22
1
public static void main(String args[]){
2
try {
3
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);
4
5
boolean bol1=queue.offer("hello");
6
boolean bol2=queue.offer("world");
7
boolean bol3=queue.offer("yes");
8
9
System.out.println(queue.toString());
10
System.out.println(bol1);
11
System.out.println(bol2);
12
System.out.println(bol3);
13
} catch (Exception e) {
14
// TODO: handle exception
15
e.printStackTrace();
16
}
17
}
18
//運行結果:
19
[hello, world]
20
true
21
true
22
false
從隊列中取出並移除頭元素的方法有:poll,remove,take。
poll: 若隊列為空,返回null。
remove:若隊列為空,拋出NoSuchElementException異常。
take:若隊列為空,發生阻塞,等待有元素。
2基於
LinkedBlockingQueue的生產者和消費者
85
1
package com.queue;
2
import java.util.concurrent.BlockingQueue;
3
import java.util.concurrent.ExecutorService;
4
import java.util.concurrent.Executors;
5
import java.util.concurrent.LinkedBlockingQueue;
6
public class LinkedBlockingQueueTest1 {
7
public static void main(String[] args) {
8
LinkedBlockingQueueTest1 test = new LinkedBlockingQueueTest1();
9
// 建立一個裝蘋果的籃子
10
Basket basket = test.new Basket();
11
ExecutorService service = Executors.newCachedThreadPool();
12
Producer producer = test.new Producer("生產者001", basket);
13
Producer producer2 = test.new Producer("生產者002", basket);
14
Consumer consumer = test.new Consumer("消費者001", basket);
15
service.submit(producer);
16
service.submit(producer2);
17
service.submit(consumer);
18
// 程序運行5s后,所有任務停止
19
try {
20
Thread.sleep(1000 * 5);
21
} catch (InterruptedException e) {
22
e.printStackTrace();
23
}
24
service.shutdownNow();
25
}
26
27
//定義籃子
28
public class Basket {
29
// 籃子,能夠容納3個蘋果
30
BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);
31
// 生產蘋果,放入籃子
32
public void produce() throws InterruptedException {
33
// put方法放入一個蘋果,若basket滿了,等到basket有位置
34
basket.put("An apple");
35
}
36
// 消費蘋果,從籃子中取走
37
public String consume() throws InterruptedException {
38
// take方法取出一個蘋果,若basket為空,等到basket有蘋果為止(獲取並移除此隊列的頭部)
39
return basket.take();
40
}
41
}
42
// 定義蘋果生產者
43
class Producer implements Runnable {
44
private String instance;
45
private Basket basket;
46
public Producer(String instance, Basket basket) {
47
this.instance = instance;
48
this.basket = basket;
49
}
50
public void run() {
51
try {
52
while (true) {
53
// 生產蘋果
54
System.out.println(instance + "生產蘋果");
55
basket.produce();
56
// 休眠300ms
57
Thread.sleep(300);
58
}
59
} catch (InterruptedException ex) {
60
System.out.println("Producer Interrupted");
61
}
62
}
63
}
64
// 定義蘋果消費者
65
class Consumer implements Runnable {
66
private String instance;
67
private Basket basket;
68
public Consumer(String instance, Basket basket) {
69
this.instance = instance;
70
this.basket = basket;
71
}
72
public void run() {
73
try {
74
while (true) {
75
// 消費蘋果
76
System.out.println(instance + "消費蘋果" + basket.consume());
77
// 休眠1000ms
78
Thread.sleep(150);
79
}
80
} catch (InterruptedException ex) {
81
System.out.println("Consumer Interrupted");
82
}
83
}
84
}
85
}
3示例2
並發庫中的BlockingQueue是一個比較好玩的類,顧名思義,就是阻塞隊列。該類主要提供了兩個方法put()和take(),前者將一個對象放到隊列中,
如果隊列已經滿了,就等待直到有空閑節點;后者從head取一個對象,如果沒有對象,就等待直到有可取的對象。
下面的例子比較簡單,一個讀線程,用於將要處理的文件對象添加到阻塞隊列中,
另外四個寫線程用於取出文件對象,為了模擬寫操作耗時長的特點,特讓線程睡眠一段隨機長度的時間。另外,該Demo也使用到了線程池和原子整型
(AtomicInteger),AtomicInteger可以在並發情況下達到原子化更新,避免使用了synchronized,而且性能非常高。由
於阻塞隊列的put和take操作會阻塞,為了使線程退出,特在隊列中添加了一個“標識”,算法中也叫“哨兵”,當發現這個哨兵后,寫線程就退出。
x
79
}
1
public class LinkedBlockingQueueTest {
2
static long randomTime() {
3
return (long) (Math.random() * 1000);
4
}
5
6
public void testName() throws Exception {
7
AtomicInteger rc = new AtomicInteger();
8
int incrementAndGet = rc.incrementAndGet();
9
System.out.println(incrementAndGet);
10
}
11
12
13
public static void main(String[] args) {
14
// 能容納100個文件
15
final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
16
// 線程池
17
final ExecutorService exec = Executors.newFixedThreadPool(5);
18
final File root = new File("D:\\JavaLib");
19
// 完成標志
20
final File exitFile = new File("");
21
// 讀個數
22
final AtomicInteger rc = new AtomicInteger();
23
// 寫個數
24
final AtomicInteger wc = new AtomicInteger();
25
// 讀線程
26
Runnable read = new Runnable() {
27
public void run() {
28
scanFile(root);
29
scanFile(exitFile);
30
}
31
public void scanFile(File file) {
32
if (file.isDirectory()) {
33
File[] files = file.listFiles(new FileFilter() {
34
public boolean accept(File pathname) {
35
return pathname.isDirectory() || pathname.getPath().endsWith(".java");
36
}
37
});
38
for (File one : files)
39
scanFile(one);
40
} else {
41
try {
42
int index = rc.incrementAndGet();
43
System.out.println("Read0: " + index + " " + file.getPath());
44
queue.put(file);
45
} catch (InterruptedException e) {
46
}
47
}
48
}
49
};
50
exec.submit(read);
51
// 四個寫線程
52
for (int index = 0; index < 4; index++) {
53
// write thread
54
final int NO = index;
55
Runnable write = new Runnable() {
56
String threadName = "Write" + NO;
57
public void run() {
58
while (true) {
59
try {
60
Thread.sleep(randomTime());
61
int index = wc.incrementAndGet();
62
File file = queue.take();
63
// 隊列已經無對象
64
if (file == exitFile) {
65
// 再次添加"標志",以讓其他線程正常退出
66
queue.put(exitFile);
67
break;
68
}
69
System.out.println(threadName + ": " + index + " " + file.getPath());
70
} catch (InterruptedException e) {
71
}
72
}
73
}
74
};
75
exec.submit(write);
76
}
77
exec.shutdown();
78
}
79
}