Java並發編程-並發工具類及線程池


  JUC中提供了幾個比較常用的並發工具類,比如CountDownLatch、CyclicBarrier、Semaphore。

CountDownLatch:

  countdownlatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程的操作執行完畢再執行。從命名可以解讀到countdown是倒數的意思,類似於我們倒計時的概念。

  countdownlatch提供了兩個方法,一個是countDown(),一個是await(), countdownLatch 初始化的時候需要傳入一個整數,在這個整數倒數到0之前,調用了await方法的程序都必須要等待,然后通過countDown() 來倒數。

  Demo :

public class CountDownLatchDemo {
	public static void main(String[] args) throws InterruptedException {
		CountDownLatch countDownLatch = new CountDownLatch(3);
		System.out.println("計數器為"+countDownLatch.getCount());
		new Thread(() -> {
			countDownLatch.countDown();
			System.out.println(Thread.currentThread().getName() +"執行完畢,計數器為"+countDownLatch.getCount());
		}, "t1").start();
		new Thread(() -> {
			countDownLatch.countDown();
			System.out.println(Thread.currentThread().getName() +"執行完畢,計數器為"+countDownLatch.getCount());
		}, "t2").start();
		new Thread(() -> {
			countDownLatch.countDown();
			System.out.println(Thread.currentThread().getName() +"執行完畢,計數器為"+countDownLatch.getCount());
		}, "t3").start();
		countDownLatch.await();//阻塞
		System.out.println("所有線程執行完畢");
	}
}

  執行結果 :

  從代碼的實現來看,有點類似join的功能,但是比join更加靈活。CountDownLatch構造函數會接收一個int類型的參數作為計數器的初始值,當調用CountDownLatch的countDown方法時,這個計數器就會減一。通過await方法去阻塞主流程:

  CountDownLatch類存在一個內部類Sync,它是一個同步工具,一定繼承了AbstractQueuedSynchronizer。很顯然,CountDownLatch實際上是是使得線程阻塞了,既然涉及到阻塞,就一定涉及到AQS隊列。await() 函數會使得當前線程在countdownLatch倒計時到0之前一直等待,除非線程中斷;從源碼中可以得知await方法會轉發到Sync的acquireSharedInterruptibly。

  acquireSharedInterruptibly():

  這塊代碼主要是判斷當前線程是否獲取到了共享鎖,AQS有兩種鎖類型,一種是共享鎖,一種是獨占鎖,在這里用的是共享鎖; 為什么要用共享鎖,因為CountDownLatch可以多個線程同時通過。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  if (Thread.interrupted()) //判斷線程是否中斷
    throw new InterruptedException();
  if (tryAcquireShared(arg) < 0) //如果等於0則返回1,否則返回-1,返回-1表示需要阻塞
    doAcquireSharedInterruptibly(arg);
}
//在這里,state的意義是count,如果計數器為0,表示不需要阻塞,否則,只有在滿足條件的情況下才會被喚醒
protected int tryAcquireShared(int acquires) {
  return (getState() == 0) ? 1 : -1;
}

  doAcquireSharedInterruptibly():獲取共享鎖

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  final Node node = addWaiter(Node.SHARED); //創建一個共享模式的節點添加到隊列中
  boolean failed = true;
  try {
    for (;;) { //自旋等待共享鎖釋放,也就是等待計數器等於0。
      final Node p = node.predecessor(); //獲得當前節點的前一個節點
      if (p == head) {
        int r = tryAcquireShared(arg);//就判斷嘗試獲取鎖,判斷計數器是否歸零
        if (r >= 0) {//r>=0表示計數器已經歸零了,則釋放當前的共享鎖
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          failed = false;
          return;
        }
      }
      //當前節點不是頭節點,則嘗試讓當前線程阻塞,第一個方法是判斷是否需要阻塞,第二個方法是阻塞
      if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
          throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

  setHeadAndPropagate(Node node, int propagate):

private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head; // 記錄頭節點
  setHead(node); //設置當前節點為頭節點
  //前面傳過來的propagate是1,所以會進入下面的代碼
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next; //獲得當前節點的下一個節點,如果下一個節點是空表示當前節點為最后一個節點,或者下一個節點是share節點
    if (s == null || s.isShared())
      doReleaseShared(); //喚醒下一個共享節點
  }
}

  doReleaseShared():釋放共享鎖,通知后面的節點

private void doReleaseShared() {
  for (;;) {
    Node h = head; //獲得頭節點
    if (h != null && h != tail) { //如果頭節點不為空且不等於tail節點
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) { //頭節點狀態為SIGNAL,
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //修改當前頭節點的狀態為0,避免下次再進入到這個里面
          continue; // loop to recheck cases
        unparkSuccessor(h); //釋放后續節點
      }
      else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue; // loop on failed CAS
      }
      if (h == head) // loop if head changed
        break;
    }
}

  countdown(): 以共享模式釋放鎖,並且會調用tryReleaseShared函數,根據判斷條件也可能會調用doReleaseShared函數

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) { //如果為true,表示計數器已歸0了
    doReleaseShared(); //喚醒處於阻塞的線程
    return true;
  }
  return false;
}

  tryReleaseShared():這里主要是對state做原子遞減,其實就是我們構造的CountDownLatch的計數器,如果等於0返回 false,也就是意味着已經不需要做doReleaseShared 了。當計數器倒數到最后1個的時候 運行到本方法后會返回 true,然后進入釋放鎖

protected boolean tryReleaseShared(int releases) {
  // Decrement count; signal when transition to zero
  for (;;) {
    int c = getState();
    if (c == 0)
      return false;
    int nextc = c-1;
    if (compareAndSetState(c, nextc))
      return nextc == 0;
  }
}

Semaphore(信號量):

  semaphore也就是我們常說的信號燈,semaphore可以控制同時訪問的線程個數,通過acquire獲取一個許可,如果沒有就等待,通過release釋放一個許可。有點類似限流的作用。叫信號燈的原因也和他的用處有關,比如某商場就5個停車位,每個停車位只能停一輛車,如果這個時候來了10輛車,必須要等前面有空的車位才能進入。先來看一下一個案例,簡單實現一下限流的功能:

public class SemaphoreDemo {

	public static void main(String[] args) {
		Semaphore semaphore = new Semaphore(5);
		for (int i = 0; i < 10; i++) {
			new Car(i, semaphore).start();
		}
	}

	static class Car extends Thread {
		private int num;
		private Semaphore semaphore;

		public Car(int num, Semaphore semaphore) {
			this.num = num;
			this.semaphore = semaphore;
		}

		public void run() {
			try {
				semaphore.acquire();// 獲取一個許可
				System.out.println("第" + num + "占用一個停車位");
				TimeUnit.SECONDS.sleep(2);
				System.out.println("第" + num + "倆車走嘍");
				semaphore.release();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

  semaphore也是基於AQS來實現的,內部使用state表示許可數量;它的實現方式和CountDownLatch的差異點在於acquireSharedInterruptibly中的tryAcquireShared方法的實現,這個方法是在Semaphore方法中重寫的,其他的源碼都是一樣的,根本原理還是基於AQS隊列,然后基於lockSupport 調用unsafe 的底層操作對線程進行 park/unpark 操作。

  在semaphore中存在公平和非公平的方式,和重入鎖是一樣的,如果通過FairSync表示公平的信號量、NonFairSync表示非公平的信號量;公平和非公平取決於是否按照FIFO隊列中的順序去分配Semaphore所維護的許可,非公平鎖通過自選的方式,類似於synchronized底層偏向鎖的意思,去搶占執行權限,而公平鎖的實現中則是判斷是否為頭節點,只有頭節點才能去搶占。

原子操作:

  當在多線程情況下,同時更新一個共享變量,由於我們前面講過的原子性問題,可能得不到預期的結果。如果要達到期望的結果,可以通過synchronized來加鎖解決,因為synchronized會保證多線程對共享變量的訪問進行排隊。

  在Java5以后,提供了原子操作類,這些原子操作類提供了一種簡單、高效以及線程安全的更新操作。而由於變量的類型很多,所以Atomic一共提供了12個類分別對應四種類型的原子更新操作,基本類型、數組類型、引用類型、屬性類型

基本類型對應:AtomicBoolean、AtomicInteger、AtomicLong

數組類型對應:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

引用類型對應:AtomicReference、AtomicReferenceFieldUpdater、AtomicMarkableReference

字段類型對應:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicStampedReference

  基本操作案例:經過原子操作最后會輸出 1000.

public class AtomicDemo {

	private static AtomicInteger count = new AtomicInteger(0);

	public static synchronized void inc() {
		try {
			Thread.sleep(1);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		count.getAndIncrement();
	}

	public static void main(String[] args) throws InterruptedException {
		for (int i = 0; i < 1000; i++) {
			new Thread(() -> {
				AtomicDemo.inc();
			}).start();
		}
		Thread.sleep(4000);
		System.out.println(count.get());
	}
}

  AtomicInteger實現原理:由於所有的原子操作類都是大同小異的,所以我們只分析其中一個原子操作類

public final int getAndIncrement() {
  return unsafe.getAndAddInt(this, valueOffset, 1);
}

  又是熟悉的 unsafe ,提供了一些底層操作如直接內存訪問、線程調度等,然后調用unsafe類中的getAndAddInt方法,這個方法如下:

public final int getAndAddInt(Object var1, long var2, int var4) {
  int var5;
  do {
    var5 = this.getIntVolatile(var1, var2);// 方法獲取對象中offset偏移地址對應的整型field的值
  } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  return var5;
}

  通過循環以及cas的方式實現原子更新,從而達到在多線程情況下仍然能夠保證原子性的目的。

 線程池:

  Java中的線程池是運用場景最多的並發框架,幾乎所有需要異步或並發執行任務的程序都可以使用線程池。線程池就像數據庫連接池的作用類似,只是線程池是用來重復管理線程避免創建大量線程增加開銷。所以合理的使用線程池可以

1. 降低創建線程和銷毀線程的性能開銷

2. 合理的設置線程池大小可以避免因為線程數超出硬件資源瓶頸帶來的問題,類似起到了限流的作用;線程是稀缺資源,如果無線創建,會造成系統穩定性問題線程池的使用

  JDK 為我們內置了幾種常見線程池的實現,均可以使用 Executors 工廠類創建為了更好的控制多線程,JDK提供了一套線程框架Executor,幫助開發人員有效的進行線程控制。它們都在

java.util.concurrent包中,是JDK並發包的核心。其中有一個比較重要的類:Executors,他扮演着線程工廠的角色,我們通過Executors可以創建特定功能的線程池

1.newFixedThreadPool:該方法返回一個固定數量的線程池,線程數不變,當有一個任務提交時,若線程池中空閑,則立即執行,若沒有,則會被暫緩在一個任務隊列中,等待有空閑的線程去執行。

  FixedThreadPool 的核心線程數和最大線程數都是指定值,也就是說當線程池中的線程數超過核心線程數后,任務都會被放到阻塞隊列中。另外 keepAliveTime 為 0,也就是超出核心線程數量以外的線程空余存活時間而這里選用的阻塞隊列是 LinkedBlockingQueue,使用的是默認容量 Integer.MAX_VALUE,相當於沒有上限這個線程池執行任務的流程如下:

\1. 線程數少於核心線程數,也就是設置的線程數時,新建線程執行任務

\2. 線程數等於核心線程數后,將任務加入阻塞隊列

\3. 由於隊列容量非常大,可以一直添加

\4. 執行完任務的線程反復去隊列中取任務執行

用途:FixedThreadPool 用於負載比較大的服務器,為了資源的合理利用,需要限制當前線程數量

2.newSingleThreadExecutor: 創建一個線程的線程池,若空閑則執行,若沒有空閑線程則暫緩在任務隊列中。

  創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行.

3.newCachedThreadPool:返回一個可根據實際情況調整線程個數的線程池,不限制最大線程數量,若用空閑的線程則執行任務,若無任務則不創建線程。並且每一個空閑線程會在60秒后自動回收

  CachedThreadPool 創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程; 並且沒有核心線程,非核心線程數無上限,但是每個空閑的時間只有 60

秒,超過后就會被回收。它的執行流程如下:

\1. 沒有核心線程,直接向 SynchronousQueue 中提交任務

\2. 如果有空閑線程,就去取出任務執行;如果沒有空閑線程,就新建一個

\3. 執行完任務的線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就被回收

4.newScheduledThreadPool: 創建一個可以指定線程的數量的線程池,但是這個線程池還帶有延遲和周期性執行任務的功能,類似定時器。

submit和execute的區別:

  執行一個任務,可以使用submit和execute,這兩者有什么區別呢?

\1. execute只能接受Runnable類型的任務

\2. submit不管是Runnable還是Callable類型的任務都可以接受,但是Runnable返回值均為void,所以使用Future的get()獲得的還是null

  在 https://www.cnblogs.com/wuzhenzhao/p/9928639.html 中我介紹了各種線程池的使用案例及使用場景,前面說的四種線程池構建工具,都是基於ThreadPoolExecutor 類,看看它的構造函數參數:

public ThreadPoolExecutor(int corePoolSize, //核心線程數量
  int maximumPoolSize, //最大線程數
  long keepAliveTime, //超時時間,超出核心線程數量以外的線程空余存活時間
  TimeUnit unit, //存活時間單位
  BlockingQueue<Runnable> workQueue, //保存執行任務的隊列
  ThreadFactory threadFactory,//創建新線程使用的工廠
  RejectedExecutionHandler handler //當任務無法執行的時候的處理方式
) {
  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  Executors.defaultThreadFactory(), defaultHandler);
}

  ThreadPoolExecutor是線程池的核心,提供了線程池的實現。ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,並另外提供一些調度方法以支持定時和周期任務。Executers是工具類,主要用來創建線程池對象我們把一個任務提交給線程池去處理的時候,線程池的處理過程是什么樣的呢?

  線程池用一個AtomicInteger來保存 [線程數量] 和 [線程池狀態] ,一個int數值一共有32位,高3位用於保存運行狀態,低29位用於保存線程數量,下面是他的一些成員變量屬性的定義:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //一個原子操作類
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //將1的二進制向右位移29位,再減1表示最大線程容量
//運行狀態保存在int值的高3位 (所有數值左移29位)
private static final int RUNNING = -1 << COUNT_BITS;// 接收新任務,並執行隊列中的任務
private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任務,但是執行隊列中的任務
private static final int STOP = 1 << COUNT_BITS;// 不接收新任務,不執行隊列中的任務,中斷正在執行中的任務
private static final int TIDYING = 2 << COUNT_BITS; //所有的任務都已結束,線程數量為0,處於該狀態的線程池即將調用terminated()方法
private static final int TERMINATED = 3 << COUNT_BITS;// terminated()方法執行完成
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取運行狀態
private static int workerCountOf(int c) { return c & CAPACITY; } //獲取線程數量

  先看一下 execute 的源碼:

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {//1.當前池中線程比核心數少,新建一個線程執行任務
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {//2.核心池已滿,但任務隊列未滿,添加到隊列中
    int recheck = ctl.get();
    //任務成功添加到隊列以后,再次檢查是否需要添加新的線程,因為已存在的線程可能被銷毀了
    if (! isRunning(recheck) && remove(command))
      reject(command);//如果線程池處於非運行狀態,並且把當前的任務從任務隊列中移除成功,則拒絕該任務
    else if (workerCountOf(recheck) == 0)//如果之前的線程已被銷毀完,新建一個線程
      addWorker(null, false);
  }
  else if (!addWorker(command, false)) //3.核心池已滿,隊列已滿,試着創建一個新線程
    reject(command); //如果創建新線程失敗了,說明線程池被關閉或者線程池完全滿了,拒絕任務
}

  這里的主要邏輯用流程圖表示如下:

  創建核心線程的時候是通過 addWorker(command, true) 方法把當前需要執行的任務放進去,通過構造一個 Worker 對象,先是通過雙層的自旋循化去判斷線程的狀態及當前核心線程的數量,最后通過CAS操作遞增了線程計器compareAndIncrementWorkerCount(c),最后調用start方法啟動一個線程,如果核心線程數滿了,則通過判斷線程是否運行,然后將任務丟進 workQueue,然后判斷是否超出最大線程數跟是否允許被執行,在前面的任務執行完的時候他就將被執行。

  這里的飽和策略 reject 有4個基本實現:

  從上至下分別是:

1.該策略是線程池的默認策略。使用該策略時,如果線程池隊列滿了丟掉這個任務並且拋出RejectedExecutionException異常。

2.這個策略和AbortPolicy的slient版本,如果線程池隊列滿了,會直接丟掉這個任務並且不會有任何異常。

3. 這個策略從字面上也很好理解,丟棄最老的。也就是說如果隊列滿了,會將最早進入隊列的任務刪掉騰出空間,再嘗試加入隊列。因為隊列是隊尾進,隊頭出,所以隊頭元素是最老的,因此每次都是移除對頭元素后再嘗試入隊。

4.使用此策略,如果添加到線程池失敗,那么主線程會自己去執行該任務,不會等待線程池中的線程去執行。就像是個急脾氣的人,我等不到別人來做這件事就干脆自己干。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM