Java並發編程快速學習


上周的面試中,被問及了幾個關於Java並發編程的問題,自己回答的都不是很系統和全面,可以說是“頭皮發麻”,哈哈。因此果斷購入《Java並發編程的藝術》一書,學習后的體會是要想快速上手Java並發編程,最需要掌握的是線程、線程池概念的理解和Executor框架的使用
Tip:
實踐請見github-multiThread,不會介紹Java內存模型等更底層的內容。看看下圖的“糙漢”身上錯綜復雜的線[程],願通過學習,能化繁為簡,[高效]的編出[高效]的多線程代碼

基本概念

在實踐中,為了更好的利用資源提高系統整體的吞吐量,會選擇並發編程。但由於上下文切換和死鎖等問題,並發編程不一定能提高性能,因此如何合理的進行並發編程時本文的重點,接下來介紹關於鎖最基本的一些知識(選學)。

  • volatile:輕量,保證共享變量的可見性,使得多個線程對共享變量的變更都能及時獲取到。其包括兩個子過程,將當前處理器緩存行的數據寫回到系統內存,之后會使其他CPU里緩存了該內存地址的數據無效。

  • synchronized:相對重量,其包含3種形式,針對普通同步方法,鎖是當前實例對象;針對靜態同步方法,鎖是當前類的Class對象;對於同步代碼塊,鎖是Synchonize括號內配置的對象。此外,synchronize用的鎖存在ava對象頭中,編譯后會插入類似monitorenter, monitorexit的代碼。

  • 鎖狀態:包括無鎖狀態,偏向鎖狀態,輕量級鎖狀態,重量級鎖狀態。Tip,鎖可以升級但不能降級。

  • Java實現原子操作:可以通過鎖和循環CAS來實現原子操作,不過其也存在3個問題,包括ABA問題,通過版本號解決;循環時間長開銷大,通過pause指令減少自旋帶來的開銷;只能保證一個共享變量的原子操作,通過AtomicRefence保證引用對象間的原子性,接下來看一個最簡單的CAS操作示例。

      protected void safeCount() {
      	for (;;) {
      		int i = atomicI.get();
      		if (atomicI.compareAndSet(i, ++i))
      			break;
      	}
      }
    

線程

這部分和之后的鎖是基礎部分的核心內容,需要好好理解。一般來說,線程都是操作系統最小的調度單元,一個進程中可以包含多個線程,每個線程都擁有自己的計數器、堆棧和局部變量。系統會采用分時的形式調度運行的線程,OS會分出一個個的時間片到線程,此外還可以給線程設置優先級,來保證優先級高的線程獲得更多的CPU時間。通過下面的示例代碼可以發現,java程序的運行不僅就是main線程,還有清楚Reference的線程、調用對象finalize方法的線程、分發處理發送給JVM信息的線程、Attach Listener線程等。

// 獲取管理線程的MXbean
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
// 打印線程信息
for (ThreadInfo threadInfo : threadInfos) {
	System.out.println("[" + threadInfo.getThreadId() + "]" + threadInfo.getThreadName());
}
  • 線程的狀態:Java線程的整個生命周期包括6種不同狀態,分別是NEW初始狀態,線程被構建但未start;RUNNABLE運行狀態,Java線程將OS中的就緒和運行兩種狀態都稱作“運行中”;BLOCKED阻塞狀態,表示線程阻塞於鎖;WAITING等待狀態,表示線程進入等待狀態,進入該狀態表示當前線程需要等待其他線程做出特定動作(通知或中斷);TIME_WAITING超時等待狀態,該狀態不同於WAITING,其會在指定的時候后返回;TERMINATED終止狀態,可以使用interrupt()合理的終止線程,表示當前線程已經執行完畢,之后通過一張Java線程狀態圖來做個形象的了解。

    Daemon守護線程概念非常簡單,java的虛擬機只有在不存在Daemon線程時才會退出。

  • 線程間通信有一個的經典范式,等待/通知機制。一個線程修改了一個對象的值,而另一個線程感知到了變化,然后進行相應的操作,整個過程開始於一個線程,而最終執行的是另一個線程。

      //等待方:1.獲取對象的鎖 2.如果條件不滿足,那么調用對象的wait方法,被通知后要檢查條件
      //3.條件滿足則執行對應的邏輯
      synchronized(lock){
      while(!flag){lock.wait();}
      }
      //通知方:1.獲取對象的鎖 2.改變條件 3.通知所有等待在對象上線程
      synchronized(lock){
      flag = true;
      lock.notifyAll();
      }
    


如果線程A執行了Thread.join(),表示當線程A等待的線程終止之后才從thread.join()返回,其還提供了join(long millis)和join(int millis, int nanos)方法,當給點時間內前驅線程未結束則強制返回。
ThreadLocal線程變量是以ThreadLocal對象為鍵,任意對象為值的存儲結構。此外,這部分常見的應用實例包括等待超時模式,數據庫線程池,基於線程池的簡單Web服務器等。

鎖是用來控制多個線程訪問共享資源的方式,在Lock接口出現前都是通過synchronized來處理線程間同步問題。鎖的主要方法包括lock, tryLock, unlock, newCondition獲取等待通知組件等方法。其相關的實現包括隊列同步器AbstractQueuedSynchronizer、重入鎖ReentrantLock、讀寫鎖ReentrantReadWriteLock、LockSupport和Condition接口,這部分的重點講是可重入鎖ReenterLock

  • 重入鎖ReentrantLock表示該鎖可以支持一個線程對資源的重復加鎖,並支持獲取瑣時的公平性的選擇。默認是非公平鎖,其特點是性能要遠高於公平鎖(嚴格按照請求時間順序獲取所,FIFO)。

      ReentrantLock lock = new ReentrantLock(true);
      lock.lock();
      try {
      	// TODO
      } finally {
      	lock.unlock();
      }
    
  • 讀寫鎖ReentrantReadWriteLock同時維護一個讀鎖和一個寫鎖,允許多個讀線程同時訪問共享數據,只會在寫線程訪問時阻塞,和數據庫的鎖機制很類似,該方式使得並發性等到很大提升。其除了公平性選擇、可重入等特性外,還支持鎖降級,遵循獲取寫鎖、獲取讀鎖再釋放寫鎖的次序,寫鎖能降級為讀鎖。

  • LockSupport提供park阻塞,unpark喚醒的靜態方法。

  • Condition接口:任意的Java對象,都擁有一組監視器方法,包括wait()notify()等,這些方法與synchronized關鍵字配合可以實現等待/通知模式,Condition接口也提供了類似的監視器方法,但功能更加強大。

進階概念

並發容器和框架

  • ConcurrentHashMap VS HashTable:之所以決定好好學學Java並發編程,可以說就是面試時被面試官懟住這個問題。過去只知道ConcurrentHashMap是HashMap的線程安全版本,但其與HashTable的區別卻從來沒關心過。簡答來說,前者通過SegmentHashEntry進行包裝,達到了記錄級別的鎖粒度,和數據庫相關知識類似。HashTable由於只支持[表]級鎖,因此性能比較低下。ConcurrentLinkedQueue則是隊列的線程安全版本,沒有什么特別要說的。
  • BlockingQueue阻塞隊列是一種支持兩個附加操作的隊列,一個是支持阻塞插入,即當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿,另一個支持阻塞的移除方法,意思是隊列為空時,獲取元素的線程會等待隊列變為非空。其處理方式包括拋出異常、返回特殊值、一直阻塞和超時退出。Java7提供的阻塞隊列包括ArrayBlockingQueue,LinkedBlockingQueue,DelayQueue等,不是重點。
  • Fork/Join框架:Java7中提供的類似Map/Reduce的並行開發框架,Fork可以將任務分解為子任務,而Join則負責匯總結果。其中涉及一個工作竊取work-stealing算法,可以使得線程可以從其他隊列里竊取任務來執行,優點是充分利用線程進行並行計算,減少了線程間的競爭;缺點是在某些情況下存在競爭,比如雙端隊列里只有一個任務時,該算法會消耗更多的系統資源。

並發工具類

這部分的內容非常重要,之后介紹的一些常見模式可以很好的應用在日常的開發場景中,一定要掌握牢靠。

  • 13個原子操作類:比較常見的有AtomicBooleanAtomicInteger,AtomicIntegerArray,AutomicReference等,接下來選擇一個比較復雜的作為示例。

      User user = new User("xionger", 30);
      atomicUserRef.set(user);
      User updateUser = new User("xiongerda", 32);
      atomicUserRef.compareAndSet(user, updateUser);
      System.out.println(atomicUserRef.get().getName());
      System.out.println(atomicUserRef.get().getOld());
    
  • CountDownLatch:類似一個計數器,允許一個或多個線程等待其他線程完成操作,比如主線程需要等待2個子線程完成任務后返回。常見場景,比如我們解析Excel多個Sheet的數據,那么可以由每個線程處理一個,再都完成后再通知系統解析完成。

      static CountDownLatch latch = new CountDownLatch(3);
      public static void main(String[] args) throws InterruptedException {
      	new Thread(new Runnable() {
      		@Override
      		public void run() {
      			latch.countDown();
      		}
      	}).start();
      	new Thread(new Runnable() {
      		@Override
      		public void run() {
      			latch.countDown();
      		}
      	}).start();
      	latch.await();
      }
    
  • CyclicBarrier:其讓一組線程到達一個屏障,類似跑步的起跑線,直到最后一個線程到達屏障,屏障才會開門,所有被阻塞的線程才能繼續執行。以可用於多線程計算數據,最后合並計算數據的場景,例如用一個Excel保存用戶所有銀行流水,每個Sheet保存一個賬戶近一年的流水,現在要統計日均流水,那么可以先計算每個Sheet的日均流水,最后匯總。使用上和CountDownLatch有些相似,不過其特點是可以使用reset方法重置,並通過isBroken()判斷線程是否中斷。

  • Semaphore信號量用於控制同時訪問特定資源的線程數量,常用與流量控制,比如數據庫連接的控制,有50個線程需要使用15數據庫連接。

      private static ExecutorService executorService = Executors.newFixedThreadPool(50);
      	private static Semaphore sema = new Semaphore(15);
      
      	public static void main(String[] args) {
      		for (int i = 0; i < 50; i++) {
      			executorService.execute(new Runnable() {
      				@Override
      				public void run() {
      					try {
      						sema.acquire();
      						System.out.println("save data");
      						sema.release();
      					} catch (InterruptedException e) {
      						e.printStackTrace();
      					}
      				}
      			});
      		}
      		executorService.shutdown();
      	}
    
  • Exchanger交換者可用於線程間數據交換,它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。Exchange可以用於遺傳算法和校對工作等場景,比如需要將紙質流水錄入到系統,為了避免錯位,使用AB崗兩人進行錄入,錄入到Excel后,系統需要加載這兩個Excel並進行校對。

      private static final Exchanger<String> exchanger = new Exchanger<>();
      private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
    
      public static void main(String[] args) {
      	threadPool.execute(new Runnable() {
      		@Override
      		public void run() {
      			String a = "銀行流水A";
      			try {
      				exchanger.exchange(a);
      			} catch (InterruptedException e) {
      				e.printStackTrace();
      			}
      		}
      	});
      	threadPool.execute(new Runnable() {
      		@Override
      		public void run() {
      			String b = "銀行流水B";
      			try {
      				String a = exchanger.exchange(b);
      				System.out.println("a和b是否數據一致:" + a.equals(b) + ",a錄入的是: " + a + ",b錄入的是" + b);
      			} catch (InterruptedException e) {
      				e.printStackTrace();
      			}
      		}
      	});
      }
    

Executor框架

  • 線程池
    在介紹Executor框架前,先介紹線程池相關的原理,其是並發編程中最為重要的部分,合理的使用線程池可以降低系統消耗、提高響應速度、提高線程的可管理性,接下來介紹線程池的基礎處理流程。

    1.如果當前運行的線程少於corePoolSize直接創建新線程來執行任務,需要獲取全局鎖。
    2.如果運行的線程等於或多余corePoolSize則將任務加入BlockingQueue。
    3.如果由於隊列已滿,無法將任務加到BlockingQueue,則創建新的線程來處任務,需要獲取全局鎖。
    4.如果創建新線程將操作maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。
    ThreadPoolExecutor采用上述步驟,保證了執行execute()時,盡可能的避免了獲取全局鎖,大部分的可能都會執行步驟2,而無需獲取全局鎖。
    在引入Executor框架前,Java線程既是工作單元,也是執行機制。而在Executor框架中,工作單元和執行機制被分離開來,前者包括RunnableCallable,而執行機制由Executor框架提供。該框架是一個兩級的調度模型,在上層,通過調度器Executor將多個任務映射到固定數量的線程;在底層,操作系統內核將這些線程再映射到處理器上。而我們的應用程序只需通過E該框架控制上層的調度即可。
    Tip:
    合理配置線程池時,需要根據具體場景給出對應的解決方案,總體來說,推薦使用有界隊列,便於控制。
    CPU密集型:配置盡可能少的線程,如cpu數量+1,可以通過Runtime.getRuntime().availableProcessors()獲取CPU個數
    IO密集型:配置盡可能多的線程,如2*cpu數量,常見場景,等待數據庫或服務接口的返回。
    優先級:可以通過PriorityBlockingQueue來處理
    監控:可以通過taskCount,completedTaskCount,getActiveSize等函數來監控線程池的運行。

  • Executor框架結構主要由三部分組成
    a.任務,包括任務實現的接口RunnableCallable
    b.任務的執行,包括任務執行機制的核心接口Executor和其子類ExecutorService,相關的實現類包括ThreadPoolExecutorScheduledThreadPoolExecutor
    c.異步計算的結果,包括Future和其實現FutureTask

  • ThreadPoolExecutor:框架的核心類,由corePool, maximumPool, BlockingQueue, RejectedExecutionHandler4部分組成,可以由工具類Executors創建。具體老說,工具類可以創建FixedThreadPool固定線程數(最推薦)、SingleThreadExecutorCachedThreadPool三種類型的ThreadPoolExecutor

  • ScheduledThreadPoolExecutor:比基礎的Timer對象更加全面,其通過DelayQueue來執行周期性或定時的任務。

  • FutureTask基於AbstractQueuedSynchronizer(AQS),之前介紹的ReentrantLockCountDownLatch等其實都是基於AQS來實現的。AQS是一個同步框架,提供通用機制來原子性的管理同步狀態、阻塞&喚醒線程、維護被阻塞的線程隊列。每個基於AQS的實現都會包含兩類操作,acquire用於阻塞調用線程,對應futureTask.get(),知道AQS狀態允許這個線程才能繼續執行;另一個為release,對應futureTask.cancel()&run(),該操作改變AQS狀態,改變后的狀態允許一個或多個阻塞線程解除阻塞。

      public static void main(String[] args) throws InterruptedException, ExecutionException {
      	ExecutorService executor = Executors.newSingleThreadExecutor();
      	 Future<BigDecimal> result = executor.submit(new Callable<BigDecimal>() {
      		@Override
      		public BigDecimal call() throws Exception {
      			return getSalaryByService();
      		}
      	});
      	System.out.println(result.get());
      }
    
  • 生產者消費者模式:該模式可以解決大部分的並發問題,其通過阻塞隊列,平衡生產線程和消費線程的工作能力來提高程序整體處理數據的速度。比如經常會郵件來分享技術知識,就可以通過通過Job到郵箱中獲取到文章並放入阻塞隊列,之后消費者去獲取數據並插入到類似confluence的文檔管理工具中,接下來展示一個單個生產者,多個消費者的應用場景實現。
    Tip:
    線上問題定位:Linux中可以通過top命令查看進程的情況,之后可以使用交互命令1查看CPU性能,H查看每個線程的性能信息。
    性能測試:比如使用Jmeter來做壓測,可以通過netstat -nat | grep 3306 -c來查看數據的壓力情況。

參考資料

  1. 方騰飛. Java並發編程的藝術[M]. 上海:機械工業出版社, 2017.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM