一,介紹
本總結我對於JAVA多線程中線程之間的通信方式的理解,主要以代碼結合文字的方式來討論線程間的通信,故摘抄了書中的一些示例代碼。
二,線程間的通信方式
①同步
這里講的同步是指多個線程通過synchronized關鍵字這種方式來實現線程間的通信。
參考示例:
public class MyObject {
synchronized public void methodA() {
//do something....
}
synchronized public void methodB() {
//do some other thing
}
}
public class ThreadA extends Thread {
private MyObject object;
//省略構造方法
@Override
public void run() {
super.run();
object.methodA();
}
}
public class ThreadB extends Thread {
private MyObject object;
//省略構造方法
@Override
public void run() {
super.run();
object.methodB();
}
}
public class Run {
public static void main(String[] args) {
MyObject object = new MyObject();
//線程A與線程B 持有的是同一個對象:object
ThreadA a = new ThreadA(object);
ThreadB b = new ThreadB(object);
a.start();
b.start();
}
}
由於線程A和線程B持有同一個MyObject類的對象object,盡管這兩個線程需要調用不同的方法,但是它們是同步執行的,比如:線程B需要等待線程A執行完了methodA()方法之后,它才能執行methodB()方法。這樣,線程A和線程B就實現了 通信。
這種方式,本質上就是“共享內存”式的通信。多個線程需要訪問同一個共享變量,誰拿到了鎖(獲得了訪問權限),誰就可以執行。
②while輪詢的方式
代碼如下:
1 import java.util.ArrayList;
2 import java.util.List;
3
4 public class MyList {
5
6 private List<String> list = new ArrayList<String>();
7 public void add() {
8 list.add("elements");
9 }
10 public int size() {
11 return list.size();
12 }
13 }
14
15 import mylist.MyList;
16
17 public class ThreadA extends Thread {
18
19 private MyList list;
20
21 public ThreadA(MyList list) {
22 super();
23 this.list = list;
24 }
25
26 @Override
27 public void run() {
28 try {
29 for (int i = 0; i < 10; i++) {
30 list.add();
31 System.out.println("添加了" + (i + 1) + "個元素");
32 Thread.sleep(1000);
33 }
34 } catch (InterruptedException e) {
35 e.printStackTrace();
36 }
37 }
38 }
39
40 import mylist.MyList;
41
42 public class ThreadB extends Thread {
43
44 private MyList list;
45
46 public ThreadB(MyList list) {
47 super();
48 this.list = list;
49 }
50
51 @Override
52 public void run() {
53 try {
54 while (true) {
55 if (list.size() == 5) {
56 System.out.println("==5, 線程b准備退出了");
57 throw new InterruptedException();
58 }
59 }
60 } catch (InterruptedException e) {
61 e.printStackTrace();
62 }
63 }
64 }
65
66 import mylist.MyList;
67 import extthread.ThreadA;
68 import extthread.ThreadB;
69
70 public class Test {
71
72 public static void main(String[] args) {
73 MyList service = new MyList();
74
75 ThreadA a = new ThreadA(service);
76 a.setName("A");
77 a.start();
78
79 ThreadB b = new ThreadB(service);
80 b.setName("B");
81 b.start();
82 }
83 }
在這種方式下,線程A不斷地改變條件,線程ThreadB不停地通過while語句檢測這個條件(list.size()==5)是否成立 ,從而實現了線程間的通信。但是這種方式會浪費CPU資源。之所以說它浪費資源,是因為JVM調度器將CPU交給線程B執行時,它沒做啥“有用”的工作,只是在不斷地測試 某個條件是否成立。就類似於現實生活中,某個人一直看着手機屏幕是否有電話來了,而不是: 在干別的事情,當有電話來時,響鈴通知TA電話來了。關於線程的輪詢的影響,可參考:JAVA多線程之當一個線程在執行死循環時會影響另外一個線程嗎?
這種方式還存在另外一個問題:
輪詢的條件的可見性問題,關於內存可見性問題,可參考:JAVA多線程之volatile 與 synchronized 的比較中的第一點“一,volatile關鍵字的可見性”
線程都是先把變量讀取到本地線程棧空間,然后再去再去修改的本地變量。因此,如果線程B每次都在取本地的 條件變量,那么盡管另外一個線程已經改變了輪詢的條件,它也察覺不到,這樣也會造成死循環。
③wait/notify機制
代碼如下:
1 import java.util.ArrayList;
2 import java.util.List;
3
4 public class MyList {
5
6 private static List<String> list = new ArrayList<String>();
7
8 public static void add() {
9 list.add("anyString");
10 }
11
12 public static int size() {
13 return list.size();
14 }
15 }
16
17
18 public class ThreadA extends Thread {
19
20 private Object lock;
21
22 public ThreadA(Object lock) {
23 super();
24 this.lock = lock;
25 }
26
27 @Override
28 public void run() {
29 try {
30 synchronized (lock) {
31 if (MyList.size() != 5) {
32 System.out.println("wait begin "
33 + System.currentTimeMillis());
34 lock.wait();
35 System.out.println("wait end "
36 + System.currentTimeMillis());
37 }
38 }
39 } catch (InterruptedException e) {
40 e.printStackTrace();
41 }
42 }
43 }
44
45
46 public class ThreadB extends Thread {
47 private Object lock;
48
49 public ThreadB(Object lock) {
50 super();
51 this.lock = lock;
52 }
53
54 @Override
55 public void run() {
56 try {
57 synchronized (lock) {
58 for (int i = 0; i < 10; i++) {
59 MyList.add();
60 if (MyList.size() == 5) {
61 lock.notify();
62 System.out.println("已經發出了通知");
63 }
64 System.out.println("添加了" + (i + 1) + "個元素!");
65 Thread.sleep(1000);
66 }
67 }
68 } catch (InterruptedException e) {
69 e.printStackTrace();
70 }
71 }
72 }
73
74 public class Run {
75
76 public static void main(String[] args) {
77
78 try {
79 Object lock = new Object();
80
81 ThreadA a = new ThreadA(lock);
82 a.start();
83
84 Thread.sleep(50);
85
86 ThreadB b = new ThreadB(lock);
87 b.start();
88 } catch (InterruptedException e) {
89 e.printStackTrace();
90 }
91 }
92 }
線程A要等待某個條件滿足時(list.size()==5),才執行操作。線程B則向list中添加元素,改變list 的size。
A,B之間如何通信的呢?也就是說,線程A如何知道 list.size() 已經為5了呢?
這里用到了Object類的 wait() 和 notify() 方法。
當條件未滿足時(list.size() !=5),線程A調用wait() 放棄CPU,並進入阻塞狀態。---不像②while輪詢那樣占用CPU
當條件滿足時,線程B調用 notify()通知 線程A,所謂通知線程A,就是喚醒線程A,並讓它進入可運行狀態。
這種方式的一個好處就是CPU的利用率提高了。
但是也有一些缺點:比如,線程B先執行,一下子添加了5個元素並調用了notify()發送了通知,而此時線程A還執行;當線程A執行並調用wait()時,那它永遠就不可能被喚醒了。因為,線程B已經發了通知了,以后不再發通知了。這說明:通知過早,會打亂程序的執行邏輯。
④管道通信就是使用java.io.PipedInputStream 和 java.io.PipedOutputStream進行通信
具體就不介紹了。分布式系統中說的兩種通信機制:共享內存機制和消息通信機制。感覺前面的①中的synchronized關鍵字和②中的while輪詢 “屬於” 共享內存機制,由於是輪詢的條件使用了volatile關鍵字修飾時,這就表示它們通過判斷這個“共享的條件變量“是否改變了,來實現進程間的交流。
而管道通信,更像消息傳遞機制,也就是說:通過管道,將一個線程中的消息發送給另一個。

