微博上眾神推薦今年4月剛剛出版的一本書,淘寶華黎撰寫的《大型網站系統與Java中間件實踐》,一線工程師的作品,實踐出真知,果斷要看。
前兩章與《淘寶技術這十年》內容類似,基本是講從一個小網站如何慢慢升級成分布式網站,從第三章開始亮出干貨,個人感覺總結的很好,本文主要摘取並擴充下作者第三章的內容
作學習交流之用,非盈利性質
線程池、線程同步、互斥鎖、讀寫鎖、原子數、喚醒、通知、信號量、線程交換隊列
線程池
推薦用ThreadPoolExecutor的工廠構造類Executors來管理線程池,線程復用線程池開銷較每次申請新線程小,具體看代碼以及注釋
public class TestThread { /** * 使用線程池的方式是復用線程的(推薦) * 而不使用線程池的方式是每次都要創建線程 * Executors.newCachedThreadPool(),該方法返回的線程池是沒有線程上限的,可能會導致過多的內存占用 * 建議使用Executors.newFixedThreadPool(n) * * 有興趣還可以看下定時線程池:SecheduledThreadPoolExecutor */ public static void main(String[] args) throws InterruptedException, ExecutionException { int nThreads = 5; /** * Executors是ThreadPoolExecutor的工廠構造方法 */ ExecutorService executor = Executors.newFixedThreadPool(nThreads); //submit有返回值,而execute沒有返回值,有返回值方便Exception的處理 Future res = executor.submit(new ConsumerThread()); //executor.execute(new ConsumerThread()); /** * shutdown調用后,不可以再submit新的task,已經submit的將繼續執行 * shutdownNow試圖停止當前正執行的task,並返回尚未執行的task的list */ executor.shutdown(); //配合shutdown使用,shutdown之后等待所有的已提交線程運行完,或者到超時。繼續執行后續代碼 executor.awaitTermination(1, TimeUnit.DAYS); //打印執行結果,出錯的話會拋出異常,如果是調用execute執行線程那異常會直接拋出,不好控制,submit提交線程,調用res.get()時才會拋出異常,方便控制異常 System.out.println("future result:"+res.get()); } static class ConsumerThread implements Runnable{ @Override public void run() { for(int i=0;i<5;i++) { System.out.println(i); } } } }
輸出:
0 1 2 3 4 future result:null
線程同步
synchronized(this)和synchronized(MyClass.class)區別:前者與加synchronized的成員方法互斥,后者和加synchronized的靜態方法互斥
synchronized的一個應用場景是單例模式的,雙重檢查鎖
public class Singleton { private volatile static Singleton singleton; private Singleton (){} public static Singleton getSingleton() { if (singleton == null) { synchronized (Singleton.class) { if (singleton == null) { singleton = new Singleton(); } } } return singleton; } }
注意:不過雙重檢查鎖返回的實例可能是沒有構造完全的對象,高並發的時候直接使用有問題,不知道在新版的java里是否解決了
所以有了內部類方式的單例模式,這樣的單例模式有了延遲加載的功能(還有一種枚舉方式的單例模式,用的不多,有興趣的可以上網查)
//(推薦)延遲加載的單例模式
public class Singleton { private static class SingletonHolder { private static final Singleton INSTANCE = new Singleton(); } private Singleton (){} public static final Singleton getInstance() { return SingletonHolder.INSTANCE; } }
若不要延遲加載,在類加載的時候實例化對象,那直接這么寫,如下:
public class Singleton { private static Singleton instance = new Singleton(); private Singleton (){} public static Singleton getInstance() { return instance; } }
volatile保證同一變量在多線程中的可見性,所以它更多是用於修飾作為開關狀態的變量
用synchronized修飾變量的get和set方法,不但可以保證和volatile修飾變量一樣的效果(獲取最新值),因為synchronized不僅會把當前線程修改的變量的本地副本同步給主存,還會從主存中讀取數據更新本地副本。而且synchronized還有互斥的效果,可以有效控制並發修改一個值,因為synchronized保證代碼塊的串行執行。如果只要求獲取最新值的特性,用volatile就好,因為volatile比較輕量,性能較好。
互斥鎖、讀寫鎖
ReentrantLock 和 ReentrantReadWriteLock
JDK5增加了ReentrantLock這個類因為兩點:
1.ReentrantLock提供了tryLock方法,tryLock調用的時候,如果鎖被其他線程(同一個線程兩次調用tryLock也都返回true)持有,那么tryLock會立即返回,返回結果是false。lock()方法會阻塞。
2.構造RenntrantLock對象可以接收一個boolean類型的參數,描述鎖公平與否的函數。公平鎖的好處是等待鎖的線程不會餓死,但是整體效率相對低一些;非公平鎖的好處是整體效率相對高一些。
注意:使用ReentrantLock后,需要顯式地進行unlock,所以建議在finally塊中釋放鎖,如下:
lock.lock(); try { //do something } finally { lock.unlock(); }
ReentrantReadWriteLock與ReentrantLock的用法類似,差異是前者通過readLock()和writeLock()兩個方法獲得相關的讀鎖和寫鎖操作。
原子數
除了用互斥鎖控制變量的並發修改之外,jdk5中還增加了原子類,通過比較並交換(硬件CAS指令)來避免線程互斥等待的開銷,進而完成超輕量級的並發控制,一般用來高效的獲取遞增計數器。
AtomicInteger counter = new AtomicInteger(); counter.incrementAndGet(); counter.decrementAndGet();
可以簡單的理解為以下代碼,增加之后與原先值比較,如果發現增長不一致則循環這個過程。代碼如下
public class CasCounter { private SimulatedCAS value; public int getValue() { return value.getValue(); } public int increment() { int oldValue = value.getValue(); while (value.compareAndSwap(oldValue, oldValue + 1) != oldValue) oldValue = value.getValue(); return oldValue + 1; } }
可以看IBM工程師的一篇文章 Java 理論與實踐: 流行的原子
喚醒、通知
wait,notify,notifyAll是java的Object對象上的三個方法,多線程中可以用這些方法完成線程間的狀態通知。
notify是喚醒一個等待線程,notifyAll會喚醒所有等待線程。
CountDownLatch主要提供的機制是當多個(具體數量等於初始化CountDownLatch時的count參數的值)線程都到達了預期狀態或完成預期工作時觸發事件,其他線程可以等待這個事件來觸發后續工作。
舉個例子,大數據分拆給多個線程進行排序,比如主線程
CountDownLatch latch = new CountDownLatch(5); for(int i=0;i<5;i++) { threadPool.execute(new MyRunnable(latch,datas)); } latch.await(); //do something 合並數據
MyRunnable的實現代碼如下
public void run() { //do something數據排序 latch.countDown();
//繼續自己線程的工作,與CyclicBarrier最大的不同,稍后馬上講 }
CyclicBarrier循環屏障,協同多個線程,讓多個線程在這個屏障前等待,直到所有線程都到達了這個屏障時,再一起繼續執行后面的動作。
使用CyclicBarrier可以重寫上面的排序代碼
主線程如下
CyclicBarrier barrier = new CyclicBarrier(5+1); //主線程也要消耗一個await,所以+1 for(int i=0;i<5;i++) { threadPool.execute(new MyRunnable(barrier,datas));//如果線程池線程數過少,就會發生死鎖 } barrier.await(); //合並數據
MyRunnable代碼如下
public void run() { //數據排序 barrier.await(); } //全部 count+1 await之后(包括主線程),之后的代碼才會一起執行
信號量
Semaphore用於管理信號量,與鎖的最大區別是,可以通過令牌的數量,控制並發數量,當管理的信號量只有1個時,就退化到互斥鎖。
例如我們需要控制遠程方法的並發量,代碼如下
semaphore.acquire(count); try { //調用遠程方法 } finally { semaphore.release(count); }
線程交換隊列
Exchanger用於在兩個線程之間進行數據交換,線程會阻塞在Exchanger的exchange方法上,直到另外一個線程也到了同一個Exchanger的exchanger方法時,二者進行交換,然后兩個線程繼續執行自身相關代碼。
public class TestExchanger { static Exchanger exchanger = new Exchanger(); public static void main(String[] args) { new Thread() { public void run() { int a = 1; try { a = (int) exchanger.exchange(a); } catch (Exception e) { e.printStackTrace(); } System.out.println("Thread1: "+a); } }.start(); new Thread() { public void run() { int a = 2; try { a = (int) exchanger.exchange(a); } catch (Exception e) { e.printStackTrace(); } System.out.println("Thread2: "+a); } }.start(); } }
輸出結果:
Thread2: 1
Thread1: 2
並發容器
CopyOnWrite思路是在更改容器時,把容器寫一份進行修改,保證正在讀的線程不受影響,適合應用在讀多寫少的場景,因為寫的時候重建一次容器。
以Concurrent開頭的容器盡量保證讀不加鎖,並且修改時不影響讀,所以會達到比使用讀寫鎖更高的並發性能
最后向大家推薦下,淘寶華黎的這本書《大型網站系統與Java中間件實戰》