線程之間的通信方式


前言 
說到線程之間通信方式:依據我的理解 主要是有兩種吧 
1.是通過共享變量,線程之間通過該變量進行協作通信; 
2.通過隊列(本質上也是線程間共享同一塊內存)來實現消費者和生產者的模式來進行通信;


1.通過線程之間共享變量的方式

  • 這個就有必要說下 wait(),notify(),以及notifyAll() 這三個方法

    • 這三個方法都是屬於Object的方法;所以所有類都可以繼承這三方法; 
      • wait()方法使得當前線程必須要等待,等到另外一個線程調用notify()或者notifyAll()方法。
      • notify()方法會喚醒一個等待當前對象的鎖的線程。而notifyAll()顧名思義;就是喚醒所有在等待中的方法;
      • wait()和notify()方法要求在調用時線程已經獲得了對象的鎖,因此對這兩個方法的調用需要放在synchronized方法或synchronized塊中。
  • 來看下下面這個實例吧 
    -通過wait() 和notifyAll() 來實現多個線程之間加減的demo

package com.zeng.awaitNotify; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 通過共享一個變量,wait()+notify() 來踐行通信 * wait()和notify()方法要求在調用時線程已經獲得了對象的鎖,因此對這兩個方法的調用需要放在synchronized方法或synchronized塊中。 * * 針對兩個線程的時候 沒有問題 * 針對線程一多的時候, 就必須要用notifyAll() * @author leo-zeng * */ public class NumberHolder { private int number; public synchronized void increase(){ while(number !=0){ try { //若是nuber 不為0 時 等待 wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //能執行到這里,說明 已經被喚醒了,並且為0 number ++; System.out.println("我要遞增:"+number); //通知在等待的線程 notifyAll(); } public synchronized void decrease(){ while(number ==0){ try { //若是等於零的時候 等待喚醒 wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //能執行到這里,說明 已經被喚醒了,並且不為0 number --; System.out.println("我要遞減:"+number); notifyAll(); } public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(10); NumberHolder holder =new NumberHolder(); //執行任務 pool.execute(new IncreaseThread(holder)); pool.execute(new DecreaseThread(holder)); pool.execute(new IncreaseThread(holder)); pool.execute(new DecreaseThread(holder)); pool.shutdown(); try { pool.awaitTermination(300,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 累加的類 * @author leo * */ class IncreaseThread extends Thread{ private NumberHolder numberHolder; public IncreaseThread(NumberHolder numberHolder) { this.numberHolder =numberHolder; } @Override public void run() { for (int i = 0; i < 20; i++) { //每次都有不多的延遲 try { Thread.sleep((long)Math.random()*1000); } catch (InterruptedException e) { e.printStackTrace(); } //執行新增操作 numberHolder.increase(); } } } class DecreaseThread extends Thread{ private NumberHolder holder; public DecreaseThread(NumberHolder holder){ this.holder =holder; } @Override public void run() { for (int i = 0; i <20; i++) { //每次都有不多的延遲 try { Thread.sleep((long)Math.random()*1000); } catch (InterruptedException e) { e.printStackTrace(); } //執行遞減函數 holder.decrease(); } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114

注意一點:這里用while 不用 if 是因為保證可能多線程中,杜絕可能累加/遞減會進行多次的可能。

  • 使用lock.newCondition().await() 和 signal() 方法實現線程之間交互 
    • 除了上述在synchronized代碼塊中使用 wait和 notify之外呢,其實在在java.util.concurrent包中,有兩個很特殊的工具類,Condition和ReentrantLock,也可以同樣實現線程間的交互協作。 
      • ReentrantLock(重入鎖)和Condition 我在這里不想細說,有興趣的可以去看些jdk源碼。
      • 這里要介紹一下condition中的await()和signal() 方法; 
        我們這邊先看demo 然后再來解釋這兩個的含義:
package com.zeng.awaitNotify; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * 通過共享變量來實現 * @author leo-zeng * */ public class Resource { private int i; private final ReentrantLock lock = new ReentrantLock(); private final Condition condition =lock.newCondition(); public void incr(){ try { //上鎖 lock.lock(); while(i!=0){ //叫停,等待喚醒的信號 condition.await(); } //說明已經得到可以用的信號 i++; System.out.println( "遞增:"+i); //給其他添加信號 condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ //解鎖 lock.unlock(); } } public void decr(){ try { //上鎖 lock.lock(); while(i==0){ //叫停, 等待遞增那邊的信號 condition.await(); } //i !=0 拿到那邊的信號 i--; System.out.println( "遞減:"+i); //給其他添加信號 condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ //解鎖 lock.unlock(); } } public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(10); Resource ss = new Resource(); threadPool.submit(new IncrThread(ss)); threadPool.submit(new DecrThread(ss)); threadPool.submit(new IncrThread(ss)); threadPool.submit(new DecrThread(ss)); threadPool.shutdown(); try { threadPool.awaitTermination(300,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } } class IncrThread extends Thread{ private Resource resource; public IncrThread(Resource resource) { this.resource = resource; } @Override public void run() { for (int i = 0; i <20; i++) { //每次停頓一秒 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } resource.incr(); } } } class DecrThread extends Thread{ private Resource resource; public DecrThread(Resource resource) { this.resource = resource; } @Override public void run() { for (int i = 0; i <20; i++) { //每次停頓一秒 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } resource.decr(); } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121

從demo中可以看到await()方法,是要先給A(累加)線程加鎖,進入await之后會讓線程沉睡,等待signal信號來叫醒,這是A線程解鎖后會進入沉睡,運行B線程;b線程先加鎖然后進行遞減,當值為0值也會進行也會睡眠的,然后解鎖,把鎖給A。就這樣來進行通信的。


2.通過隊列來實現線程的通信

  • 這里用的是java.util.concurrent包中linkedBlockingQueue 來進行線程間交互; 
    • java.util.concurrent.LinkedBlockingQueue 是一個基於單向鏈表的、范圍任意的(其實是有界的)、FIFO 阻塞隊列。訪問與移除操作是在隊頭進行,添加操作是在隊尾進行,並分別使用不同的鎖進行保護,只有在可能涉及多個節點的操作才同時對兩個鎖進行加鎖。
    • 這里通過共享一個隊列的信息,實現生產者和消費者
package com.zeng.awaitNotify; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * 通過linkedblockingQueue 構建線程間通信 * @author leo-zeng * */ public class LinkedBlockingQueueTest { public static void main(String[] args) { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(); ExecutorService threadPool = Executors.newFixedThreadPool(10); threadPool.execute(new Producer(queue)); threadPool.execute(new Consumer(queue)); if(!threadPool.isShutdown()){ threadPool.shutdown(); try { threadPool.awaitTermination(300, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread{ private LinkedBlockingQueue<String> queue; public Producer(LinkedBlockingQueue<String> queue) { this.queue =queue; } @Override public void run() { for (int i = 0; i < 20; i++) { System.out.println("生產出:"+i); try { Thread.sleep(100); queue.put(new String("producer:"+i)); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer extends Thread{ private LinkedBlockingQueue<?> queue; public Consumer(LinkedBlockingQueue<String> q) { this.queue =q; } @Override public void run() { while(true){ try { System.out.println("consumer 消費了:"+queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM