java多線程管理 concurrent包用法詳解


 

我們都知道,在JDK1.5之前,Java中要進行業務並發時,通常需要有程序員獨立完成代碼實現,當然也有一些開源的框架提供了這些功能,但是這些依然沒有JDK自帶的功能使用起來方便。而當針對高質量Java多線程並發程序設計時,為防止死蹦等現象的出現,比如使用java之前的wait()、notify()和synchronized等,每每需要考慮性能、死鎖、公平性、資源管理以及如何避免線程安全性方面帶來的危害等諸多因素,往往會采用一些較為復雜的安全策略,加重了程序員的開發負擔.萬幸的是,在JDK1.5出現之后,Sun大神(Doug Lea)終於為我們這些可憐的小程序員推出了java.util.concurrent工具包以簡化並發完成。開發者們借助於此,將有效的減少競爭條件(race conditions)和死鎖線程。concurrent包很好的解決了這些問題,為我們提供了更實用的並發程序模型。

Executor                  :具體Runnable任務的執行者。
ExecutorService           :一個線程池管理者,其實現類有多種,我會介紹一部分。我們能把Runnable,Callable提交到池中讓其調度。
Semaphore                 :一個計數信號量
ReentrantLock             :一個可重入的互斥鎖定 Lock,功能類似synchronized,但要強大的多。
Future                    :是與Runnable,Callable進行交互的接口,比如一個線程執行結束后取返回的結果等等,還提供了cancel終止線程。
BlockingQueue             :阻塞隊列。
CompletionService         : ExecutorService的擴展,可以獲得線程執行結果的
CountDownLatch            :一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。 
CyclicBarrier             :一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 
Future                    :Future 表示異步計算的結果。
ScheduledExecutorService :一個 ExecutorService,可安排在給定的延遲后運行或定期執行的命令。
接下來逐一介紹
Executors主要方法說明
newFixedThreadPool(固定大小線程池)
創建一個可重用固定線程集合的線程池,以共享的無界隊列方式來運行這些線程(只有要請求的過來,就會在一個隊列里等待執行)。如果在關閉前的執行期間由於失敗而導致任何線程終止,那么一個新線程將代替它執行后續的任務(如果需要)。
newCachedThreadPool(無界線程池,可以進行自動線程回收)
創建一個可根據需要創建新線程的線程池,但是在以前構造的線程可用時將重用它們。對於執行很多短期異步任務的程序而言,這些線程池通常可提高程序性能。調用 execute 將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線程並添加到池中。終止並從緩存中移除那些已有 60 秒鍾未被使用的線程。因此,長時間保持空閑的線程池不會使用任何資源。注意,可以使用 ThreadPoolExecutor 構造方法創建具有類似屬性但細節不同(例如超時參數)的線程池。
newSingleThreadExecutor(單個后台線程)
創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。(注意,如果因為在關閉前的執行期間出現失敗而終止了此單個線程,那么如果需要,一個新線程將代替它執行后續的任務)。可保證順序地執行各個任務,並且在任意給定的時間不會有多個線程是活動的。與其他等效的 newFixedThreadPool(1) 不同,可保證無需重新配置此方法所返回的執行程序即可使用其他的線程。
這些方法返回的都是ExecutorService對象,這個對象可以理解為就是一個線程池。
這個線程池的功能還是比較完善的。可以提交任務submit()可以結束線程池shutdown()。
01 import java.util.concurrent.ExecutorService;
02 import java.util.concurrent.Executors;
03 public class MyExecutor extends Thread {
04 private int index;
05 public MyExecutor(int i){
06     this.index=i;
07 }
08 public void run(){
09     try{
10      System.out.println("["+this.index+"] start....");
11      Thread.sleep((int)(Math.random()*1000));
12      System.out.println("["+this.index+"] end.");
13     }
14     catch(Exception e){
15      e.printStackTrace();
16     }
17 }
18 public static void main(String args[]){
19     ExecutorService service=Executors.newFixedThreadPool(4);
20     for(int i=0;i<10;i++){
21      service.execute(new MyExecutor(i));
22      //service.submit(new MyExecutor(i));
23     }
24     System.out.println("submit finish");
25     service.shutdown();
26 }
27 }

雖然打印了一些信息,但是看的不是非常清晰,這個線程池是如何工作的,我們來將休眠的時間調長10倍。
Thread.sleep((int)(Math.random()*10000));
再來看,會清楚看到只能執行4個線程。當執行完一個線程后,才會又執行一個新的線程,也就是說,我們將所有的線程提交后,線程池會等待執行完最后shutdown。我們也會發現,提交的線程被放到一個“無界隊列里”。這是一個有序隊列(BlockingQueue,這個下面會說到)。
另外它使用了Executors的靜態函數生成一個固定的線程池,顧名思義,線程池的線程是不會釋放的,即使它是Idle。
這就會產生性能問題,比如如果線程池的大小為200,當全部使用完畢后,所有的線程會繼續留在池中,相應的內存和線程切換(while(true)+sleep循環)都會增加。
如果要避免這個問題,就必須直接使用ThreadPoolExecutor()來構造。可以像通用的線程池一樣設置“最大線程數”、“最小線程數”和“空閑線程keepAlive的時間”。

這個就是線程池基本用法。
Semaphore
一個計數信號量。從概念上講,信號量維護了一個許可集合。如有必要,在許可可用前會阻塞每一個 acquire(),然后再獲取該許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,並采取相應的行動。
Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的線程數目。例如,下面的類使用信號量控制對內容池的訪問:
這里是一個實際的情況,大家排隊上廁所,廁所只有兩個位置,來了10個人需要排隊。
01 import java.util.concurrent.ExecutorService;
02 import java.util.concurrent.Executors;
03 import java.util.concurrent.Semaphore;
04 public class MySemaphore extends Thread {
05 Semaphore position;
06 private int id;
07 public MySemaphore(int i,Semaphore s){
08     this.id=i;
09     this.position=s;
10 }
11 public void run(){
12     try{
13      if(position.availablePermits()>0){
14       System.out.println("顧客["+this.id+"]進入廁所,有空位");
15      }
16      else{
17       System.out.println("顧客["+this.id+"]進入廁所,沒空位,排隊");
18      }
19      position.acquire();
20      System.out.println("顧客["+this.id+"]獲得坑位");
21      Thread.sleep((int)(Math.random()*1000));
22      System.out.println("顧客["+this.id+"]使用完畢");
23      position.release();
24     }
25     catch(Exception e){
26      e.printStackTrace();
27     }
28 }
29 public static void main(String args[]){
30     ExecutorService list=Executors.newCachedThreadPool();
31     Semaphore position=new Semaphore(2);
32     for(int i=0;i<10;i++){
33      list.submit(new MySemaphore(i+1,position));
34     }
35     list.shutdown();
36     position.acquireUninterruptibly(2);
37     System.out.println("使用完畢,需要清掃了");
38     position.release(2);
39 }
40 }
ReentrantLock
一個可重入的互斥鎖定 Lock,它具有與使用 synchronized 方法和語句所訪問的隱式監視器鎖定相同的一些基本行為和語義,但功能更強大。
ReentrantLock 將由最近成功獲得鎖定,並且還沒有釋放該鎖定的線程所擁有。當鎖定沒有被另一個線程所擁有時,調用 lock 的線程將成功獲取該鎖定並返回。如果當前線程已經擁有該鎖定,此方法將立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法來檢查此情況是否發生。
此類的構造方法接受一個可選的公平參數。
當設置為 true時,在多個線程的爭用下,這些鎖定傾向於將訪問權授予等待時間最長的線程。否則此鎖定將無法保證任何特定訪問順序。
與采用默認設置(使用不公平鎖定)相比,使用公平鎖定的程序在許多線程訪問時表現為很低的總體吞吐量(即速度很慢,常常極其慢),但是在獲得鎖定和保證鎖定分配的均衡性時差異較小。不過要注意的是,公平鎖定不能保證線程調度的公平性。因此,使用公平鎖定的眾多線程中的一員可能獲得多倍的成功機會,這種情況發生在其他活動線程沒有被處理並且目前並未持有鎖定時。還要注意的是,未定時的 tryLock 方法並沒有使用公平設置。因為即使其他線程正在等待,只要該鎖定是可用的,此方法就可以獲得成功。
建議總是 立即實踐,使用 try 塊來調用 lock,在之前/之后的構造中,最典型的代碼如下: 
01 class X {
02     private final ReentrantLock lock = new ReentrantLock();
03     // ...
04     public void m() {
05       lock.lock(); // block until condition holds
06       try {
07         // ... method body
08       } finally {
09         lock.unlock()
10       }
11     }
12 }
我的例子:
01 import java.util.concurrent.ExecutorService;
02 import java.util.concurrent.Executors;
03 import java.util.concurrent.locks.ReentrantLock;
04 public class MyReentrantLock extends Thread{
05 TestReentrantLock lock;
06 private int id;
07 public MyReentrantLock(int i,TestReentrantLock test){
08     this.id=i;
09     this.lock=test;
10 }
11 public void run(){
12     lock.print(id);
13 }
14 public static void main(String args[]){
15     ExecutorService service=Executors.newCachedThreadPool();
16     TestReentrantLock lock=new TestReentrantLock();
17     for(int i=0;i<10;i++){
18      service.submit(new MyReentrantLock(i,lock));
19     }
20     service.shutdown();
21 }
22 }
23 class TestReentrantLock{
24 private ReentrantLock lock=new ReentrantLock();
25 public void print(int str){
26     try{
27      lock.lock();
28      System.out.println(str+"獲得");
29      Thread.sleep((int)(Math.random()*1000));
30     }
31     catch(Exception e){
32      e.printStackTrace();
33     }
34     finally{
35      System.out.println(str+"釋放");
36      lock.unlock();
37     }
38 }
39 }
BlockingQueue
支持兩個附加操作的 Queue,這兩個操作是:檢索元素時等待隊列變為非空,以及存儲元素時等待空間變得可用。
BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會拋出 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。
BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 額外的元素。
沒有任何內部容量約束的 BlockingQueue 總是報告 Integer.MAX_VALUE 的剩余容量。
BlockingQueue 實現主要用於生產者-使用者隊列,但它另外還支持 Collection 接口。因此,舉例來說,使用 remove(x) 從隊列中移除任意一個元素是有可能的。
然而,這種操作通常不 會有效執行,只能有計划地偶爾使用,比如在取消排隊信息時。
BlockingQueue 實現是線程安全的。所有排隊方法都可以使用內部鎖定或其他形式的並發控制來自動達到它們的目的。
然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動執行,除非在實現中特別說明。
因此,舉例來說,在只添加了 c 中的一些元素后,addAll(c) 有可能失敗(拋出一個異常)。
BlockingQueue 實質上不 支持使用任何一種“close”或“shutdown”操作來指示不再添加任何項。
這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。
下面的例子演示了這個阻塞隊列的基本功能。
01 import java.util.concurrent.BlockingQueue;
02 import java.util.concurrent.ExecutorService;
03 import java.util.concurrent.Executors;
04 import java.util.concurrent.LinkedBlockingQueue;
05 public class MyBlockingQueue extends Thread {
06 public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
07 private int index;
08 public MyBlockingQueue(int i) {
09    this.index = i;
10 }
11 public void run() {
12    try {
13     queue.put(String.valueOf(this.index));
14     System.out.println("{" + this.index + "} in queue!");
15    } catch (Exception e) {
16     e.printStackTrace();
17    }
18 }
19 public static void main(String args[]) {
20    ExecutorService service = Executors.newCachedThreadPool();
21    for (int i = 0; i < 10; i++) {
22     service.submit(new MyBlockingQueue(i));
23    }
24    Thread thread = new Thread() {
25     public void run() {
26      try {
27       while (true) {
28        Thread.sleep((int) (Math.random() * 1000));
29        if(MyBlockingQueue.queue.isEmpty())
30         break;
31        String str = MyBlockingQueue.queue.take();
32        System.out.println(str + " has take!");
33       }
34      } catch (Exception e) {
35       e.printStackTrace();
36      }
37     }
38    };
39    service.submit(thread);
40    service.shutdown();
41 }
42 }
---------------------執行結果-----------------
{0} in queue!
{1} in queue!
{2} in queue!
{3} in queue!
0 has take!
{4} in queue!
1 has take!
{6} in queue!
2 has take!
{7} in queue!
3 has take!
{8} in queue!
4 has take!
{5} in queue!
6 has take!
{9} in queue!
7 has take!
8 has take!
5 has take!
9 has take!
-----------------------------------------

CompletionService
將生產新的異步任務與使用已完成任務的結果分離開來的服務。生產者 submit 執行的任務。使用者 take 已完成的任務,
並按照完成這些任務的順序處理它們的結果。例如,CompletionService 可以用來管理異步 IO ,執行讀操作的任務作為程序或系統的一部分提交,
然后,當完成讀操作時,會在程序的不同部分執行其他操作,執行操作的順序可能與所請求的順序不同。
通常,CompletionService 依賴於一個單獨的 Executor 來實際執行任務,在這種情況下,
CompletionService 只管理一個內部完成隊列。ExecutorCompletionService 類提供了此方法的一個實現。

01 import java.util.concurrent.Callable;
02 import java.util.concurrent.CompletionService;
03 import java.util.concurrent.ExecutorCompletionService;
04 import java.util.concurrent.ExecutorService;
05 import java.util.concurrent.Executors;
06 public class MyCompletionService implements Callable<String> {
07 private int id;
08  
09 public MyCompletionService(int i){
10    this.id=i;
11 }
12 public static void main(String[] args) throws Exception{
13    ExecutorService service=Executors.newCachedThreadPool();
14    CompletionService<String> completion=new ExecutorCompletionService<String>(service);
15    for(int i=0;i<10;i++){
16     completion.submit(new MyCompletionService(i));
17    }
18    for(int i=0;i<10;i++){
19     System.out.println(completion.take().get());
20    }
21    service.shutdown();
22 }
23 public String call() throws Exception {
24    Integer time=(int)(Math.random()*1000);
25    try{
26     System.out.println(this.id+" start");
27     Thread.sleep(time);
28     System.out.println(this.id+" end");
29    }
30    catch(Exception e){
31     e.printStackTrace();
32    }
33    return this.id+":"+time;
34 }
35 }

CountDownLatch

一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。
用給定的計數 初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。
之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
CountDownLatch 是一個通用同步工具,它有很多用途。將計數 1 初始化的 CountDownLatch 用作一個簡單的開/關鎖存器,
或入口:在通過調用 countDown() 的線程打開入口前,所有調用 await 的線程都一直在入口處等待。
用 N 初始化的 CountDownLatch 可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。
CountDownLatch 的一個有用特性是,它不要求調用 countDown 方法的線程等到計數到達零時才繼續,
而在所有線程都能通過之前,它只是阻止任何線程繼續通過一個 await。 
一下的例子是別人寫的,非常形象。
01 import java.util.concurrent.CountDownLatch;
02 import java.util.concurrent.ExecutorService;
03 import java.util.concurrent.Executors;
04 public class TestCountDownLatch {
05 public static void main(String[] args) throws InterruptedException {
06    // 開始的倒數鎖
07    final CountDownLatch begin = new CountDownLatch(1);
08    // 結束的倒數鎖
09    final CountDownLatch end = new CountDownLatch(10);
10    // 十名選手
11    final ExecutorService exec = Executors.newFixedThreadPool(10);
12    
13    for (int index = 0; index < 10; index++) {
14     final int NO = index + 1;
15     Runnable run = new Runnable() {
16      public void run() {
17       try {
18        begin.await();//一直阻塞
19        Thread.sleep((long) (Math.random() * 10000));
20        System.out.println("No." + NO + " arrived");
21       } catch (InterruptedException e) {
22       } finally {
23        end.countDown();
24       }
25      }
26     };
27     exec.submit(run);
28    }
29    System.out.println("Game Start");
30    begin.countDown();
31    end.await();
32    System.out.println("Game Over");
33    exec.shutdown();
34 }
35 }

CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數一次,后者是等待倒數到0,如果沒有到達0,就只有阻塞等待了。

CyclicBarrier
一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。
在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環 的 barrier。
CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),
該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作 很有用。
示例用法:下面是一個在並行分解設計中使用 barrier 的例子,很經典的旅行團例子:
01 import java.text.SimpleDateFormat;
02 import java.util.Date;
03 import java.util.concurrent.BrokenBarrierException;
04 import java.util.concurrent.CyclicBarrier;
05 import java.util.concurrent.ExecutorService;
06 import java.util.concurrent.Executors;
07 public class TestCyclicBarrier {
08   // 徒步需要的時間: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan
09   private static int[] timeWalk = { 5, 8, 15, 15, 10 };
10   // 自駕游
11   private static int[] timeSelf = { 1, 3, 4, 4, 5 };
12   // 旅游大巴
13   private static int[] timeBus = { 2, 4, 6, 6, 7 };
14    
15   static String now() {
16      SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
17      return sdf.format(new Date()) + ": ";
18   }
19   static class Tour implements Runnable {
20      private int[] times;
21      private CyclicBarrier barrier;
22      private String tourName;
23      public Tour(CyclicBarrier barrier, String tourName, int[] times) {
24        this.times = times;
25        this.tourName = tourName;
26        this.barrier = barrier;
27      }
28      public void run() {
29        try {
30          Thread.sleep(times[0] * 1000);
31          System.out.println(now() + tourName + " Reached Shenzhen");
32          barrier.await();
33          Thread.sleep(times[1] * 1000);
34          System.out.println(now() + tourName + " Reached Guangzhou");
35          barrier.await();
36          Thread.sleep(times[2] * 1000);
37          System.out.println(now() + tourName + " Reached Shaoguan");
38          barrier.await();
39          Thread.sleep(times[3] * 1000);
40          System.out.println(now() + tourName + " Reached Changsha");
41          barrier.await();
42          Thread.sleep(times[4] * 1000);
43          System.out.println(now() + tourName + " Reached Wuhan");
44          barrier.await();
45        } catch (InterruptedException e) {
46        } catch (BrokenBarrierException e) {
47        }
48      }
49   }
50   public static void main(String[] args) {
51      // 三個旅行團
52      CyclicBarrier barrier = new CyclicBarrier(3);
53      ExecutorService exec = Executors.newFixedThreadPool(3);
54      exec.submit(new Tour(barrier, "WalkTour", timeWalk));
55      exec.submit(new Tour(barrier, "SelfTour", timeSelf));
56 //當我們把下面的這段代碼注釋后,會發現,程序阻塞了,無法繼續運行下去。
57      exec.submit(new Tour(barrier, "BusTour", timeBus));
58      exec.shutdown();
59   }
60 }

CyclicBarrier最重要的屬性就是參與者個數,另外最要方法是await()。當所有線程都調用了await()后,就表示這些線程都可以繼續執行,否則就會等待。
Future
Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並檢索計算的結果。
計算完成后只能使用 get 方法來檢索結果,如有必要,計算完成前可以阻塞此方法。取消則由 cancel 方法來執行。
還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。
如果為了可取消性而使用 Future但又不提供可用的結果,則可以聲明 Future<?> 形式類型、並返回 null 作為基礎任務的結果。
這個我們在前面CompletionService已經看到了,這個Future的功能,而且這個可以在提交線程的時候被指定為一個返回對象的。

ScheduledExecutorService
一個 ExecutorService,可安排在給定的延遲后運行或定期執行的命令。
schedule 方法使用各種延遲創建任務,並返回一個可用於取消或檢查執行的任務對象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法創建並執行某些在取消前一直定期運行的任務。
用 Executor.execute(java.lang.Runnable) 和 ExecutorService 的 submit 方法所提交的命令,通過所請求的 0 延遲進行安排。
schedule 方法中允許出現 0 和負數延遲(但不是周期),並將這些視為一種立即執行的請求。
所有的 schedule 方法都接受相對 延遲和周期作為參數,而不是絕對的時間或日期。將以 Date 所表示的絕對時間轉換成要求的形式很容易。
例如,要安排在某個以后的日期運行,可以使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。
但是要注意,由於網絡時間同步協議、時鍾漂移或其他因素的存在,因此相對延遲的期滿日期不必與啟用任務的當前 Date 相符。
Executors 類為此包中所提供的 ScheduledExecutorService 實現提供了便捷的工廠方法。
一下的例子也是網上比較流行的。
01 import static java.util.concurrent.TimeUnit.SECONDS;
02 import java.util.Date;
03 import java.util.concurrent.Executors;
04 import java.util.concurrent.ScheduledExecutorService;
05 import java.util.concurrent.ScheduledFuture;
06 public class TestScheduledThread {
07 public static void main(String[] args) {
08    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
09    final Runnable beeper = new Runnable() {
10     int count = 0;
11     public void run() {
12      System.out.println(new Date() + " beep " + (++count));
13     }
14    };
15    // 1秒鍾后運行,並每隔2秒運行一次
16    final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS);
17    // 2秒鍾后運行,並每次在上次任務運行完后等待5秒后重新運行
18    final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS);
19    // 30秒后結束關閉任務,並且關閉Scheduler
20    scheduler.schedule(new Runnable() {
21     public void run() {
22      beeperHandle.cancel(true);
23      beeperHandle2.cancel(true);
24      scheduler.shutdown();
25     }
26    }, 30, SECONDS);
27 }
28 }

這樣我們就把concurrent包下比較重要的功能都已經總結完了,希望對我們理解能有幫助。

 

 

 

 

 

 

==========================================

 

 

 

JAVA concurrent

本文主要講解Java並發相關的內容,包括鎖、信號量、堵塞隊列、線程池等主要內容。

並發的優點和缺點

在講述怎么利用多線程的情況下,我們先看一下采用多線程並發的優缺點。

優點

  • 提高資源利用率
    如讀取一個目錄下的所有文件,如果采用單線程模型,則從磁盤讀取文件的時候,大部分CPU用於等待磁盤去讀取數據。如果是采用多線程並發執行,則CPU可以在等待IO的時候去做其他的事情,以提高CPU的使用率,減少資源的浪費。

  • 程序響應速度好
    單線程模型下,假設一個http請求需要占用大量的時間來處理,則其他的請求無法發送請求給服務端。而多線程模式下,監聽線程把請求傳遞給工作者線程,然后立刻返回去監聽,可以去接收新的請求,而工作者線程則能夠處理這個請求並發送一個回復給客戶端。明顯響應速度比單線程模型要好得多。

    缺點

  • 程序設計復雜度
    多線程情況下,需要考慮線程間的通信、共享資源的訪問,相對而言要比單線程程序負責一些。

  • 上下文切換開銷大
    CPU從執行一個線程切換到執行另外一個線程的時候,它需要先存儲當前線程的本地的數據,程序指針等,然后載入另一個線程的本地數據,程序指針等,最后才開始執行。這種切換稱為“上下文切換”。CPU會在一個上下文中執行一個線程,然后切換到另外一個上下文中執行另外一個線程。尤其是當線程數量較多時,這種開銷很明顯。

  • 資源消耗
    線程在運行的時候需要從計算機里面得到一些資源。除了CPU,線程還需要一些內存來維持它本地的堆棧。它也需要占用操作系統中一些資源來管理線程

    並發模型

    並發系統可以采用多種並發編程模型來實現。並發模型指定了系統中的線程如何通過協作來完成分配給它們的作業。不同的並發模型采用不同的方式拆分作業,同時線程間的協作和交互方式也不相同。

    並行工作者

    在並行工作者模型中,委派者(Delegator)將傳入的作業分配給不同的工作者。每個工作者完成整個任務。工作者們並行運作在不同的線程上,甚至可能在不同的CPU上。

    假設電商系統中的秒殺活動采用了並行工作者模型,訂單->財務->倉儲->物流,工作者A拿到訂單請求,然后負責支付流程,查詢倉儲情況,直到發貨。
    在Java應用系統中,並行工作者模型是最常見的並發模型,java.util.concurrent包中的許多並發實用工具都是設計用於這個模型的。

優點
易於理解,可以添加更多的工作者來提高系統的並行度
缺點

  • 共享狀態可能會很復雜
    在上面的電商系統中,由於共享的工作者經常需要訪問一些共享數據,無論是內存中的或者共享的數據庫中的。
    在等待訪問共享數據結構時,線程之間的互相等待將會丟失部分並行性。許多並發數據結構是阻塞的,意味着在任何一個時間只有一個或者很少的線程能夠訪問。這樣會導致在這些共享數據結構上出現競爭狀態。在執行需要訪問共享數據結構部分的代碼時,高競爭基本上會導致執行時出現一定程度的串行化。

  • 無狀態的工作者
    每次都重讀需要的數據,將會導致速度變慢,特別是狀態保存在外部數據庫中的時候。

  • 任務順序是不確定的
    作業執行順序是不確定的,無法保證哪個作業最先或者最后被執行。如A先下單,B后下單,不根據時間進行業務邏輯的判斷,不能有可能B先於A收到貨。

流水線模式

流水線模式中,每個工作者只負責作業中的部分工作。當完成了自己的這部分工作時工作者會將作業轉發給下一個工作者。每個工作者在自己的線程中運行,並且不會和其他工作者共享狀態。也稱反應器系統,或事件驅動系統。

以秒殺為例,工作者A執行訂單的處理,工作者B執行支付,工作者C檢查倉儲,工作者D負責物流,分工明確,各司其職。
在實際應用中,作業有可能不會沿着單一流水線進行。由於大多數系統可以執行多個作業,作業從一個工作者流向另一個工作者取決於作業需要做的工作。在實際中可能會有多個不同的虛擬流水線同時運行。

作業甚至也有可能被轉發到超過一個工作者上並發處理。比如說,作業有可能被同時轉發到作業執行器和作業日志器。下圖說明了三條流水線是如何通過將作業轉發給同一個工作者(中間流水線的最后一個工作者)來完成作業:

優點

  • 無需共享的狀態
    工作者之間無需共享狀態,意味着實現的時候無需考慮所有因並發訪問共享對象而產生的並發性問題

  • 較好的硬件整合
    單線程代碼在整合底層硬件的時候往往具有更好的優勢。首先,當能確定代碼只在單線程模式下執行的時候,通常能夠創建更優化的數據結構和算法。

  • 合理的作業順序
    基於流水線並發模型實現的並發系統,在某種程度上是有可能保證作業的順序的。作業的有序性使得它更容易地推出系統在某個特定時間點的狀態

缺點

  • 編寫難度大
    好在有一些平台框架可以直接使用,如Akka,Node.JS

  • 跟蹤困難
    流水線並發模型最大的缺點是作業的執行往往分布到多個工作者上,並因此分布到項目中的多個類上。這樣導致在追蹤某個作業到底被什么代碼執行時變得困難。

函數式並行

函數式並行的基本思想是采用函數調用實現程序。函數可以看作是代理人agents或者actor,函數之間可以像流水線模型(反應器或者事件驅動系統)那樣互相發送消息。
函數都是通過拷貝來傳遞參數的,所以除了接收函數外沒有實體可以操作數據。這對於避免共享數據的競態來說是很有必要的。同樣也使得函數的執行類似於原子操作。每個函數調用的執行獨立於任何其他函數的調用。

Runnable、Callable、Future、Thread、FutureTask

Java並發中主要以RunnableCallableFuture三個接口作為基礎。

Runnable

實例想要被線程執行,可以通過實現Runnable接口。
。通過實例化某個 Thread 實例並將自身作為運行目標,就可以運行實現 Runnable 的類而無需創建 Thread 的子類。大多數情況下,如果只想重寫 run() 方法,而不重寫其他 Thread 方法,那么應使用 Runnable 接口。這很重要,因為除非程序員打算修改或增強類的基本行為,否則不應為該類創建子類。

Callable

Callable 接口類似於 Runnable,兩者都是為那些其實例可能被另一個線程執行的類設計的。但是 Runnable不會返回結果,並且無法拋出經過檢查的異常。

Future

Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。計算完成后只能使用 get 方法來獲取結果,如有必要,計算完成前可以阻塞此方法。取消則由 cancel 方法來執行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。如果為了可取消性而使用 Future 但又不提供可用的結果,則可以聲明 Future<?> 形式類型、並返回 null 作為底層任務的結果。
主要方法如下:

  • cancel(boolean mayInterruptIfRunning)
    試圖取消對此任務的執行。

  • get()
    如有必要,等待計算完成,然后獲取其結果。

  • get(long timeout, TimeUnit unit)
    如有必要,最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。

  • isCancelled()
    如果在任務正常完成前將其取消,則返回 true。

  • isDone()
    如果任務已完成,則返回 true。

Thread

線程的創建

Java中,我們有2個方式創建線程:

  • 通過直接繼承thread類,然后覆蓋run()方法。

  • 構建一個實現Runnable接口的類, 然后創建一個thread類對象並傳遞Runnable對象作為構造參數

    線程的運行流程

    我們在主線程中創建5個子線程,每個子線程通過構造函數初始化number的值,來實現1-5內的乘法表:

      package com.molyeo.java.concurrent;  public class ThreadTest {      public static void main(String[] args) {
              System.out.println("main thread start");          for (int i = 1; i <= 5; i++) {
                  Calculator calculator = new Calculator(i);
                  Thread thread = new Thread(calculator);
                  thread.start();
              }
              System.out.println("main thread end");
          }
      }   
      class Calculator implements Runnable {  
          private int number; 
          public Calculator(int number) {          this.number = number;
          }   
          @Override
          public void run() {          for (int i = 1; i <= 5; i++) {
                  System.out.printf("%s: %d * %d = %d \n", Thread.currentThread().getName(), number, i, i * number);
              }
          }
      }

程序輸出如下:

main thread startThread-0: 1 * 1 = 1 Thread-0: 1 * 2 = 2 Thread-0: 1 * 3 = 3 Thread-0: 1 * 4 = 4 Thread-0: 1 * 5 = 5 Thread-4: 5 * 1 = 5 Thread-4: 5 * 2 = 10 Thread-4: 5 * 3 = 15 Thread-4: 5 * 4 = 20 Thread-4: 5 * 5 = 25 Thread-3: 4 * 1 = 4 Thread-3: 4 * 2 = 8 Thread-2: 3 * 1 = 3 Thread-2: 3 * 2 = 6 Thread-2: 3 * 3 = 9 Thread-2: 3 * 4 = 12 Thread-1: 2 * 1 = 2 Thread-1: 2 * 2 = 4 Thread-1: 2 * 3 = 6 main thread endThread-1: 2 * 4 = 8 Thread-3: 4 * 3 = 12 Thread-3: 4 * 4 = 16 Thread-3: 4 * 5 = 20 Thread-2: 3 * 5 = 15 Thread-1: 2 * 5 = 10

在Java中,每個應用程序最少有一個執行線程,運行程序時,JVM負責調用main()方法的執行線程。
當全部的非守護線程執行結束時,Java程序才算結束。從輸出中也可以看到,主程序輸出main thread end后,其他程序還是繼續執行,直到執行結束。
需要注意的是,如果某個線程調用System.exit()指示終結程序,那么全部的線程都會結束執行。

線程中斷、睡眠、設置優先級

下面的示例中,NumberGenerator中首先創建numberGenetorThread線程,並設置優先級,啟動線程后,一直循環運行,打印出number的值,直到5毫秒后主線程調用interrupt()方法讓其中斷,numberGenetorThread線程其跳出while循環。首次調用方法isInterrupted()返回值為true,表示線程已中斷。
需要注意的是,interrupt()方法測試當前線程是否已經中斷,線程的中斷狀態也由該方法清除。換句話說,如果連續兩次調用該方法,則第二次調用將返回 false。大家可以打開下面的注釋去測試。

package com.molyeo.java.concurrent;/**
 * Created by zhangkh on 2018/8/23.
 */public class ThreadTest2 {    public static void main(String[] args) throws InterruptedException {
        Thread numberGenetorThread = new NumberGenerator(0);
        numberGenetorThread.setPriority(Thread.MAX_PRIORITY);
        numberGenetorThread.start();
        Thread.sleep(5);
        numberGenetorThread.interrupt();
        System.out.println("first interrupt,isInterrupted=" + numberGenetorThread.isInterrupted());//        Thread.sleep(5);//        numberGenetorThread.interrupt();//        System.out.println("second interrupt,isInterrupted=" + numberGenetorThread.isInterrupted());

    }
}class NumberGenerator extends Thread {    private int number;    public NumberGenerator(int number) {        this.number = number;
    }    @Override
    public void run() {        while (!isInterrupted()) {
            System.out.println("number is " + number);
            number++;
        }
        System.out.println("NumberGenerator thread,isInterrupted= " + this.isInterrupted());
    }
}

程序部分輸出如下:

number is 96number is 97NumberGenerator thread,isInterrupted= truefirst interrupt,isInterrupted=true

ThreadLocal

定義和作用
ThreadLocal稱線程本地變量,並不是為了解決共享對象的多線程訪問的問題的,因為如果ThreadLocal.set()放進去的本來就是多線程共享的同一個對象的話,線程通過ThreadLocal.get()方法得到的還是共享對象本身,依舊存在並發訪問的問題。其是每個線程所單獨持有的,主要是提供了保持對象的方法和避免參數傳遞,以方便對象的訪問。

  • 每個線程中都有一個自己的ThreadLocalMap類對象,可以將線程自己的對象保持到其中,各管各的,線程可以正確的訪問到自己的對象。

  • 將一個共用的ThreadLocal靜態實例作為key,將不同對象的引用保存到不同線程的ThreadLocalMap中,然后在線程執行的各處通過這個靜態ThreadLocal實例的get()方法取得自己線程保存的那個對象,避免了將這個對象作為參數傳遞的麻煩。

程序運行時,每個線程都保持對其線程局部變量副本的隱式引用,只要線程是活動的並且 ThreadLocal 實例是可訪問的;在線程消失之后,其線程局部實例的所有副本都會被垃圾回收(除非存在對這些副本的其他引用)。

使用示例
如下我們創建ThreadLocal的實例stringLocal,分別在主線程和子線程中設置其值為當前線程名字。查看輸出的結果可以看到線程間彼此不干擾,各自輸出自己設置的值。

package com.molyeo.java.concurrent;/**
 * Created by zhangkh on 2018/8/24.
 */public class ThreadLocalDemo {    public static void main(String[] args) throws InterruptedException {
        ThreadLocal<String> stringLocal = new ThreadLocal<String>();
        stringLocal.set(Thread.currentThread().getName());
        System.out.println(String.format("threadName=%10s,threadLocal valaue=%10s",Thread.currentThread().getName(),stringLocal.get()) );

        Thread thread1 = new Thread() {            public void run() {
                stringLocal.set(Thread.currentThread().getName());
                System.out.println(String.format("threadName=%10s,threadLocal valaue=%10s",Thread.currentThread().getName(),stringLocal.get()) );
            }
        };
        thread1.start();
        thread1.join();

        System.out.println(String.format("threadName=%10s,threadLocal valaue=%10s",Thread.currentThread().getName(),stringLocal.get()) );
    }
}

程序輸出如下:

threadName=      main,threadLocal valaue=      mainthreadName=  Thread-0,threadLocal valaue=  Thread-0threadName=      main,threadLocal valaue=      main

源碼實現
ThreadLocal有3個成員變量

private final int threadLocalHashCode = nextHashCode();private static AtomicInteger nextHashCode = new AtomicInteger();private static final int HASH_INCREMENT = 0x61c88647;private static int nextHashCode() {    return nextHashCode.getAndAdd(HASH_INCREMENT);
}

nextHashCodeThreadLocal的靜態變量,HASH_INCREMENT是靜態常量,只有threadLocalHashCodeThreadLocal實例的變量。
在創建ThreadLocal類實例的時候,將ThreadLocal類的下一個hashCode值即nextHashCode的值賦給實例的threadLocalHashCode,然后nextHashCode的值增加HASH_INCREMENT這個值。而實例變量threadLocalHashCodefinal的,用來區分不同的ThreadLocal實例。
ThreadLocal實例stringLocal創建完成后,調用set()方法時,

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);    if (map != null)
        map.set(this, value);    else
        createMap(t, value);
}

先獲取當前線程,即main線程,然后根據線程實例調用getMap()方法獲取ThreadLocalMap
其中getMap()方法如下:

ThreadLocalMap getMap(Thread t) {    return t.threadLocals;
}

getMap()方法直接返回線程的成員變量threadLocals,其中threadLocals變量是ThreadLocalMap類的實例,而ThreadLocalMapThreadLocal的內部類。
如果map(當前線程的成員變量threadLocals)存在,則將數據寫入到ThreadLoclMap用於存儲數據的Entry中。
ThreadLocalMapset方法如下:

private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;    int len = tab.length;    int i = key.threadLocalHashCode & (len-1);    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();        if (k == key) {
            e.value = value;            return;
        }        if (k == null) {
            replaceStaleEntry(key, value, i);            return;
        }
    }

    tab[i] = new Entry(key, value);    int sz = ++size;    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

其中Entry定義如下

static class Entry extends WeakReference<ThreadLocal<?>> {    Object value;    Entry(ThreadLocal<?> k, Object v) {        super(k);
        value = v;
    }
}

keyThreadLocal實例,值是用戶定義的具體對象值。

如果map(當前線程的成員變量threadLocals)不存在,則創建一個ThreadLocalMap實例,並和線程的成員變量threadLocals關聯起來。其中ThreadLocalMap實例的keythis,即ThreadLocal實例stringLocal,值是用戶定義的具體對象值。

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

總的來說,ThreadLocal的作用是提供線程內的局部變量,這種變量在線程的生命周期內起作用。作用:提供一個線程內公共變量(比如本次請求的用戶信息),減少同一個線程內多個函數或者組件之間一些公共變量的傳遞的復雜度,或者為線程提供一個私有的變量副本,這樣每一個線程都可以隨意修改自己的變量副本,而不會對其他線程產生影響。

其他內容待續......

本文參考

Java 7 Concurrency Cookbook

http://ifeve.com/concurrency-modle-seven-week-1/

http://tutorials.jenkov.com/java-concurrency/concurrency-models.html

關注大數據處理、數據挖掘 如需轉載,請注明出處:http://www.cnblogs.com/molyeo/

原文出處:https://www.cnblogs.com/molyeo/p/9530427.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM