Java線程並發控制基礎知識


微博上眾神推薦今年4月剛剛出版的一本書,淘寶華黎撰寫的《大型網站系統與Java中間件實踐》,一線工程師的作品,實踐出真知,果斷要看。

前兩章與《淘寶技術這十年》內容類似,基本是講從一個小網站如何慢慢升級成分布式網站,從第三章開始亮出干貨,個人感覺總結的很好,本文主要摘取並擴充下作者第三章的內容

作學習交流之用,非盈利性質

線程池、線程同步、互斥鎖、讀寫鎖、原子數、喚醒、通知、信號量、線程交換隊列

 

線程池

推薦用ThreadPoolExecutor的工廠構造類Executors來管理線程池,線程復用線程池開銷較每次申請新線程小,具體看代碼以及注釋

public class TestThread {
    /**
     * 使用線程池的方式是復用線程的(推薦)
     * 而不使用線程池的方式是每次都要創建線程
     * Executors.newCachedThreadPool(),該方法返回的線程池是沒有線程上限的,可能會導致過多的內存占用
     * 建議使用Executors.newFixedThreadPool(n)
     * 
     * 有興趣還可以看下定時線程池:SecheduledThreadPoolExecutor
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int nThreads = 5;
        
        /**
         * Executors是ThreadPoolExecutor的工廠構造方法
         */
        ExecutorService executor = Executors.newFixedThreadPool(nThreads);
        
        //submit有返回值,而execute沒有返回值,有返回值方便Exception的處理
        Future res = executor.submit(new ConsumerThread());
        //executor.execute(new ConsumerThread());
        
        /**
         * shutdown調用后,不可以再submit新的task,已經submit的將繼續執行
         * shutdownNow試圖停止當前正執行的task,並返回尚未執行的task的list
         */
        executor.shutdown();
        
        //配合shutdown使用,shutdown之后等待所有的已提交線程運行完,或者到超時。繼續執行后續代碼
        executor.awaitTermination(1, TimeUnit.DAYS);
        
        //打印執行結果,出錯的話會拋出異常,如果是調用execute執行線程那異常會直接拋出,不好控制,submit提交線程,調用res.get()時才會拋出異常,方便控制異常
        System.out.println("future result:"+res.get());
    }
    
    static class ConsumerThread implements Runnable{

        @Override
        public void run() {
            for(int i=0;i<5;i++) {
                System.out.println(i);
            }
        }
    }
}
輸出:
0 1 2 3 4 future result:null

 

 線程同步

synchronized(this)和synchronized(MyClass.class)區別:前者與加synchronized的成員方法互斥,后者和加synchronized的靜態方法互斥

 

synchronized的一個應用場景是單例模式的,雙重檢查鎖

public class Singleton {  
    private volatile static Singleton singleton;  
    private Singleton (){}  
    public static Singleton getSingleton() {  
    if (singleton == null) {  
        synchronized (Singleton.class) {  
          if (singleton == null) {  
              singleton = new Singleton();  
          }  
        }  
    }  
    return singleton;  
    }  
}  

 

注意:不過雙重檢查鎖返回的實例可能是沒有構造完全的對象,高並發的時候直接使用有問題,不知道在新版的java里是否解決了

所以有了內部類方式的單例模式,這樣的單例模式有了延遲加載的功能(還有一種枚舉方式的單例模式,用的不多,有興趣的可以上網查)

//(推薦)延遲加載的單例模式
public
class Singleton { private static class SingletonHolder {   private static final Singleton INSTANCE = new Singleton(); } private Singleton (){} public static final Singleton getInstance() {   return SingletonHolder.INSTANCE; } }

 

若不要延遲加載,在類加載的時候實例化對象,那直接這么寫,如下:

public class Singleton {  
    private static Singleton instance = new Singleton();  
    private Singleton (){}  
    public static Singleton getInstance() {  
      return instance;  
    }  
}  

 

volatile保證同一變量在多線程中的可見性,所以它更多是用於修飾作為開關狀態的變量

用synchronized修飾變量的get和set方法,不但可以保證和volatile修飾變量一樣的效果(獲取最新值),因為synchronized不僅會把當前線程修改的變量的本地副本同步給主存,還會從主存中讀取數據更新本地副本。而且synchronized還有互斥的效果,可以有效控制並發修改一個值,因為synchronized保證代碼塊的串行執行。如果只要求獲取最新值的特性,用volatile就好,因為volatile比較輕量,性能較好。

互斥鎖、讀寫鎖

ReentrantLock 和 ReentrantReadWriteLock

JDK5增加了ReentrantLock這個類因為兩點:

1.ReentrantLock提供了tryLock方法,tryLock調用的時候,如果鎖被其他線程(同一個線程兩次調用tryLock也都返回true)持有,那么tryLock會立即返回,返回結果是false。lock()方法會阻塞。

2.構造RenntrantLock對象可以接收一個boolean類型的參數,描述鎖公平與否的函數。公平鎖的好處是等待鎖的線程不會餓死,但是整體效率相對低一些;非公平鎖的好處是整體效率相對高一些。

注意:使用ReentrantLock后,需要顯式地進行unlock,所以建議在finally塊中釋放鎖,如下:

lock.lock();
try {
     //do something  
}
finally {
     lock.unlock();  
}

ReentrantReadWriteLock與ReentrantLock的用法類似,差異是前者通過readLock()和writeLock()兩個方法獲得相關的讀鎖和寫鎖操作。

 

原子數

除了用互斥鎖控制變量的並發修改之外,jdk5中還增加了原子類,通過比較並交換(硬件CAS指令)來避免線程互斥等待的開銷,進而完成超輕量級的並發控制,一般用來高效的獲取遞增計數器。

AtomicInteger counter = new AtomicInteger();
counter.incrementAndGet();
counter.decrementAndGet();

可以簡單的理解為以下代碼,增加之后與原先值比較,如果發現增長不一致則循環這個過程。代碼如下

public class CasCounter {
    private SimulatedCAS value;
    public int getValue() {
        return value.getValue();
    }
    public int increment() {
        int oldValue = value.getValue();
        while (value.compareAndSwap(oldValue, oldValue + 1) != oldValue)
            oldValue = value.getValue();
        return oldValue + 1;
    }
}

可以看IBM工程師的一篇文章 Java 理論與實踐: 流行的原子

 

喚醒、通知

wait,notify,notifyAll是java的Object對象上的三個方法,多線程中可以用這些方法完成線程間的狀態通知。

notify是喚醒一個等待線程,notifyAll會喚醒所有等待線程。

CountDownLatch主要提供的機制是當多個(具體數量等於初始化CountDownLatch時的count參數的值)線程都到達了預期狀態或完成預期工作時觸發事件,其他線程可以等待這個事件來觸發后續工作。

舉個例子,大數據分拆給多個線程進行排序,比如主線程

CountDownLatch latch = new CountDownLatch(5);

for(int i=0;i<5;i++) {
    threadPool.execute(new MyRunnable(latch,datas));    
}

latch.await();

//do something 合並數據

MyRunnable的實現代碼如下

public void run() {
     //do something數據排序  
     latch.countDown(); 
   //繼續自己線程的工作,與CyclicBarrier最大的不同,稍后馬上講 }

 

CyclicBarrier循環屏障,協同多個線程,讓多個線程在這個屏障前等待,直到所有線程都到達了這個屏障時,再一起繼續執行后面的動作

 使用CyclicBarrier可以重寫上面的排序代碼

主線程如下

CyclicBarrier barrier = new CyclicBarrier(5+1); //主線程也要消耗一個await,所以+1

for(int i=0;i<5;i++) {
    threadPool.execute(new MyRunnable(barrier,datas));//如果線程池線程數過少,就會發生死鎖
}

barrier.await();
//合並數據

MyRunnable代碼如下

public void run() {
    //數據排序
    barrier.await();
}

//全部 count+1 await之后(包括主線程),之后的代碼才會一起執行

 

 信號量

Semaphore用於管理信號量,與鎖的最大區別是,可以通過令牌的數量,控制並發數量,當管理的信號量只有1個時,就退化到互斥鎖。

例如我們需要控制遠程方法的並發量,代碼如下

semaphore.acquire(count);
try {
     //調用遠程方法  
}
finally {
     semaphore.release(count);  
}

 

線程交換隊列

Exchanger用於在兩個線程之間進行數據交換,線程會阻塞在Exchanger的exchange方法上,直到另外一個線程也到了同一個Exchanger的exchanger方法時,二者進行交換,然后兩個線程繼續執行自身相關代碼。

public class TestExchanger {
    static Exchanger exchanger = new Exchanger();
    public static void main(String[] args) {
        new Thread() {
            public void run() {
                int a = 1;
                try {
                    a = (int) exchanger.exchange(a);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("Thread1: "+a);
            }
        }.start();
        
        new Thread() {
            public void run() {
                int a = 2;
                try {
                    a = (int) exchanger.exchange(a);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("Thread2: "+a);
            }
        }.start();
    }
}

輸出結果:

Thread2: 1
Thread1: 2

 

並發容器

CopyOnWrite思路是在更改容器時,把容器寫一份進行修改,保證正在讀的線程不受影響,適合應用在讀多寫少的場景,因為寫的時候重建一次容器。

以Concurrent開頭的容器盡量保證讀不加鎖,並且修改時不影響讀,所以會達到比使用讀寫鎖更高的並發性能

 

最后向大家推薦下,淘寶華黎的這本書《大型網站系統與Java中間件實戰》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM