多線程編程(2)—線程安全


1. 問題的引出 

 線程安全歸根結底可以說是內存安全,在jvm內存模型中,有一塊特殊的公共內存空間,稱為堆內存,進程內的所有線程都可以訪問並修改其中的數據,就會造成潛在的問題。因為堆內存空間在沒有保護機制的情況下,你放進去的數據,可能被別的線程篡改。如下代碼:

public class ThreadSafe implements Runnable {
    private static int count = 0;
    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            count++;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            es.execute(new ThreadSafe());
        }
        es.shutdown();  //不允許添加線程到線程池,異步關閉連接池
        es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的線程任務完成
        System.out.println(count);
    }
}

 本來期望的值是20000,可是最終輸出的結果卻一點在變化,其值總是小於等於20000,顯然這是由於線程不安全造成的,多個線程並發的去訪問全局變量、靜態變量、文件、設備、套接字等都可能出現這種問題。


2. 線程同步的措施

 為了協調和配合線程之間對共享資源的訪問,通常有四種方式:

   1. 臨界區:訪問某一段臨界資源的代碼片段,與共享資源類似,但有一點不同的是,某一時刻只允許一個線程去訪問(對應java中的關鍵字 synchronized包含的代碼)。

  2. 互斥量:互斥量是一個對象,只有都擁有互斥量的對象才可以訪問共享資源。而且互斥量中只有一個,通常互斥量的實現是通過鎖來實現的,而且加鎖操作和釋放操作只能由同一個線程來完成。此處與臨界區的區別是一段代碼,通常存在與一個文件中,而互斥量是一個對象,加鎖操作和解鎖操作可以在不同的文件去編寫,只要是同一個線程就好。

  3. 信號量: 信號量可以允許指定數量的線程同一時刻去訪問共享資源,當線程數達到了閾值后,將阻止其他線程的訪問,最常見的比如生產者和消費者問題。信號量和互斥量的區別則是信號量的發出和釋放可以由不同線程來完成

  4. 事件:通過發送通知的形式來實現線程同步,可以實現不同進程中的線程同步操作


 3.飢餓與死鎖

  飢餓:某些線程或進程由於長期得不到資源,而總是處於就緒或者阻塞狀態。例如:

  ①. 該進程或線程所擁有的CPU時間片被其他線程搶占而得不到執行(通常是優先級比它高的線程或進程),一直處於就緒狀態。

  ②. 由於選用不恰當的調度算法,導致該進程或線程長期無法得到CPU時間片,處於就緒狀態。

  ③. 由於喚醒的時間把握不對,喚醒線程時,所需的資源處於被鎖定狀態,導致線程回到阻塞狀態。

  死鎖:兩個或多個線程在執行過程中,由於相互占有對方所需的資源而又互不相讓從而造成這些線程都被阻塞,若無外力的作用下,他們都將無法執行下去。例如

  ①. 進程推進順序不合適。互相占有彼此需要的資源,同時請求對方占有的資源,形成循環依賴的關系。

  ②. 資源不足。

  ③. 進程運行推進順序與速度不同,也可能產生死鎖。

  一些避免死鎖的措施:

  1. 不要在鎖區域內在加把鎖,即不要在釋放鎖之前競爭其他鎖。

  2. 減小鎖粒度,即減小線程加鎖的范圍,真正需要的時候再去加鎖。

  3. 順序訪問共享資源。

  4. 設置超時機制,超過指定時間則程序返回錯誤。

  5. 競爭鎖期間,允許程序被中斷。


 4.代碼層面解決線程安全

 解決線程安全主要考慮三方面:

  1. 可見性:當多個線程並發的讀寫某個共享資源的時候,每個線程總是可以取到該共享資源的最新數據。

  2. 原子性:某線程對一個或者多個共享資源所做的一連串寫操作不會被中斷,在此期間不會有其他線程同時對這些共享資源進行寫操作。

  3. 有序性:單個線程內的操作必須是有序的。

通常原子性都可以得到保證,問題的病端就出在可見性和原子性。

4.1 可見性的問題

  如下實例程序,按通常的理解來說,當主線程等待一秒后,把flag的值修改為true后,另外一個線程應該可以感知到,然后跳過while循環,直接打印出后面的數據,可是最結果卻一直卡在了while循環里。

public class Thread4 implements Runnable{
    private static boolean flag = false;
    @Override
    public void run() {
        System.out.println("waiting for data....");     
        while (!flag);
        System.out.println("cpying with data");
    }

    public static void main(String[] args) throws InterruptedException {
        Thread4 thread4 = new Thread4();
        Thread t = new Thread(thread4);
        t.start();
        Thread.sleep(1000);
        flag = true;
    }
}
/* output
 * waiting for data....
 */

  主要的原因是java程序在jvm上運行的時候,該程序所占用的內存分為兩類主內存和工作內存(邏輯上的內存,實際上是cpu的寄存器和高速緩存,因為,cpu在計算的時候,並不總是從內存讀取數據,它的數據讀取順序優先級是:寄存器-高速緩存-內存。CPU和內存之間通過總線進行)。也就是在主線程中啟動另一個線程t會開辟出一個新的工作內存,與主線程的工作內存相互獨立,且線程之間無法直接通信,只能去主內存讀取全局變量,而線程t中做while判斷的時候並不會去讀取主內存的flag值,致使線程t無法被感知到flag在其他線程被改變,可以做一個測試,現在把run函數改成如下形式: 

public void run() {
        System.out.println("waiting for data....");
        /*  Notice
            如果在while循環里加上System.out.println(flag);語句,則不會使用本工作內存的flag數據,
            而是重新去主內存加載數據
         */
        while (!flag){
            System.out.println(flag);     //測試,可以做到線程的可見性
        }
        System.out.println("cpying with data");
 }
/* output
 * waiting for data....
 * false
 * ...
 * false
 * cpying with data
 */

 為了感知其他線程中一些全局變量值的變化,而且避免頻繁去測試主內存中的數據變化,保證線程之間的可見性,可以使用volatile關鍵字去修飾全局變量,如下:

public class Thread4 implements Runnable{
    private volatile static boolean flag = false;
    @Override
	public void run() {
        System.out.println("waiting for data....");
        while (!flag);
        System.out.println("cpying with data");
    }
    //....
    
 }
/* output
 * waiting for data....
 * cpying with data
 */

volatile關鍵字借助MESI一致性協議,會在工作內存(CPU的寄存器等)與主內存連接的總線上建立一道總線嗅探機制,一旦發現其他線程修改了主內存中的某個全局變量(即圖中橙灰色線條讀取的數據以及寫回的數據),就會讓其他工作線程中從主內存拷貝出來的副本變量失效(即圖中紫色的線條讀取的數據),從而會使左邊的線程重新去讀取數據(即圖中紅色的線條讀取的數據)。如下圖:

  雖然解決了原子性問題,可是volatile關鍵字不支持原子性操作,如下程序:

public class Thread5 implements Runnable {
    private static volatile int count = 0;
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            count++;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            es.execute(new Thread5());
        }
        es.shutdown();  //不允許添加線程,異步關閉連接池
        es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的線程任務完成
        System.out.println(count);
    }
}
/* output
 * 175630
 */

4.2 原子性問題

 針對原子性問題,我們可以使用熟悉的synchronized關鍵字,synchronized關鍵字最主要有以下3種應用方式:

  • 修飾實例方法,作用於當前實例加鎖,進入同步代碼前要獲得當前實例的鎖

  • 修飾靜態方法,作用於當前類對象加鎖,進入同步代碼前要獲得當前類對象的鎖

  • 修飾代碼塊,指定加鎖對象,對給定對象加鎖,進入同步代碼庫前要獲得給定對象的鎖。

部分示例代碼如下:

public class Thread5 implements Runnable {
    private static int count = 0;
    public synchronized static void add() {
        count++;
    }
    @Override
    public void run() {
        for (int i = 0; i < 1000000; i++) {
//          add()
            synchronized (Thread5.class){
                count++;
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            es.execute(new Thread5());
        }
        es.shutdown();  //不允許添加線程,異步關閉連接池
        es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的線程任務完成
        System.out.println(count);
    }
}	
/* output
 * 20000000
 */

然而synchronized是一種悲觀鎖,具有強烈的獨占和排他特性,它頻繁的加鎖和釋放鎖操作會使程序的效率低下。與悲觀鎖相對是一種樂觀鎖操作CAS(CompareAndSwap),樂觀鎖就是每次去取數據的時候都樂觀的認為數據不會被修改,因此這個過程不會上鎖,但是在更新的時候會判斷一下在此期間的數據有沒有更新,如果沒有更新則去修改,否則失敗。可是上面這種 操作會出現ABA(A-B-A,中途被改變,但最后又改回原值)的問題,

針對上面的問題,java中可以使用Atomic,它的包名為java.util.concurrent.atomic。這個包里面提供了一組原子變量的操作類(通過值加版本號的方式去解決ABA問題),這些類可以保證在多線程環境下,當某個線程在執行atomic的方法時,不會被其他線程打斷,一直等到該方法執行完成(具體的API文檔可以查看參考文獻第5點)。

public class ThreadSafe implements Runnable {
    private static AtomicInteger count = new AtomicInteger(0);
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            count.getAndAdd(1);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            es.execute(new ThreadSafe());
        }
        es.shutdown();  //不允許添加線程,異步關閉連接池
        es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的線程任務完成
        System.out.println(count);
    }
}
/* output
 * 200000
 */

5. 其他方法解決線程同步

a. 自旋鎖

 線程循環反復檢查變量是否可用,在這一過程中線程一直保持執行(RUNNABLE),因此是一種忙等待,不像關鍵字synchronized一樣,一旦發現不能訪問,則處於線程處於阻塞狀態(BLOCKED)。

public class Thread6 implements Runnable{
    private static  final Lock lock = new ReentrantLock();
    private volatile static int count = 0;
    @Override
    public void run() {
        for (int i = 0; i < 1000000; i++){
            lock.lock();
            count++;
            lock.unlock();
        }
    }
    static void test(ExecutorService es) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            es.execute(new Thread6());
        }
        es.shutdown();  //不允許添加線程,異步關閉連接池
        es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的線程任務完成
        System.out.println(count);
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(20);
        test(es);
    }
}

 如果在使用lock的時候包含了try...catch...語句,要注意的是lock 必須在 finally 塊中釋放。否則,如果受保護的代碼將拋出異常,鎖就有可能永遠得不到釋放!

 與Lock類同一個包java.util.concurrent.locks下還有一種讀寫分離的鎖ReentrantReadWriteLock類,讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖。一般情況下,讀寫鎖的性能都會比排它鎖好,因為大多數場景讀是多於寫的。在讀多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的並發性和吞吐量。

  

public class RWTest {
    private static final Map<String, Object> map = new HashMap<String, Object>();
    private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private static final Lock readLock = lock.readLock();
    private static final Lock writeLock = lock.writeLock();

    public static final Object get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public static final Object put(String key, Object value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    public static final void clear() {
        writeLock.lock();
        try {
            map.clear();
        } finally {
            writeLock.unlock();
        }
    }
}

 利用自旋鎖Lock也提供了Condition來實現線程間的狀態通知的,可以根據實際情況去喚醒某個線程(與后面的wait不同,是隨機的)或者所有線程。可以通過lock.newCondition()來獲取得Condition實例,可以根據實際需求創建多個實例。

public class Thread9 {
    public static ReentrantLock lock=new ReentrantLock();
    public static Condition condition =lock.newCondition();
    public static void main(String[] args) {
        new Thread(){
            @Override
            public void run() {
                lock.lock();//請求鎖
                try{
                    System.out.println(Thread.currentThread().getName()+"==》進入等待");
                    condition.await();//設置當前線程進入等待
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    lock.unlock();//釋放鎖
                }
                System.out.println(Thread.currentThread().getName()+"==》繼續執行");
            }
        }.start();
        new Thread(){
            @Override
            public void run() {
                lock.lock();//請求鎖
                try{
                    System.out.println(Thread.currentThread().getName()+"=》進入");
                    Thread.sleep(2000);//休息2秒
                    condition.signal();//隨機喚醒等待隊列中的一個線程
                    System.out.println(Thread.currentThread().getName()+"休息結束");
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    lock.unlock();//釋放鎖
                }
            }
        }.start();
    }
}
/*output
 *Thread-0==》進入等待
 *Thread-1=》進入
 *Thread-1休息結束
 *Thread-0==》繼續執行
 */

b. wait.notify.notifyAll

 在關鍵字synchronized的線程同步機制,調用線程的sleep,yield方法時,線程並不會讓出對象鎖,但是調用wait卻不同,線程自動釋放其占有的對象鎖,同時不會去申請對象鎖,當線程被喚醒的時候,它才再次去申請競爭對象的鎖(該關鍵字通常只與synchronized結合使用)。notify()喚醒在等待該對象同步鎖的線程(只喚醒一個,如果有多個在等待),注意的是在調用此方法的時候,並不能確切的喚醒某一個等待狀態的線程,而是由JVM確定喚醒哪個線程,而且不是按優先級。而notifyAll()則是喚醒所有等待的線程。

public class Thread8 implements Runnable {

    private int num;
    private Object lock;

    public Thread8(int num, Object lock) {
        this.num = num;
        this.lock = lock;
    }

    public void run() {
        try {
            while (true) {
                synchronized (lock) {
                    lock.notifyAll();
                    lock.wait();
                    System.out.println(num);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        final Object lock = new Object();
        Thread thread1 = new Thread(new Thread8(1, lock));
        Thread thread2 = new Thread(new Thread8(2, lock));
        thread1.start();
        thread2.start();
    }
}
/* output
 * 交替輸出1,2,1,2,1,2......
 */

6.並發編程—CountDownLatch、CyclicBarrier、Semaphore和fork/join框架

1. CountDownLatch

  CountDownLatch實現的是一個倒序計數器,可以通過調用它的countDown實現計數器減一和await方法來阻塞當前線程:

public class Thread10 {
    public static void main(String[] args) throws InterruptedException {
        int count = 20;
        final CountDownLatch cdl = new CountDownLatch(count);
        ExecutorService es = Executors.newCachedThreadPool();

        for (int i = 0; i < count; i++) {
            es.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(cdl.getCount());
                    }finally {
                        cdl.countDown();
                    }
                }
            });
        }
        cdl.await();
        es.shutdown();
        System.out.println("主線程現在才結束: count = "+cdl.getCount());
    }
}	

2.CyclicBarrier

  即回環柵欄,是一種可重用的線程阻塞器,它將率先到達柵欄的這些線程阻塞(調用await()方法),直到指定數量的線程都到達該處,這些線程將會被全部釋放。

public class Thread11 implements Runnable{
    private int num;
    private static CyclicBarrier cb = new CyclicBarrier(6); //指定柵欄的等待線程數
    public Thread11(int num){
        this.num = num;
    }
    @Override
    public void run() {
        try {
            Thread.sleep(1000*num);     //等待指定數量時間后到達柵欄處
            System.out.println(Thread.currentThread().getName() +" is coming..");
            cb.await(10L, TimeUnit.SECONDS);
            System.out.println("continue....");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
        for (int i = 0; i < 8; i++) {
            es.execute(new Thread11(i));
        }
        es.shutdown();
    }
}
/*
 *pool-1-thread-1 is coming..
 *pool-1-thread-2 is coming..
 *pool-1-thread-3 is coming..
 *continue....
 *continue....
 *continue....
 *pool-1-thread-4 is coming..
 *超時異常錯誤(指定時間內線程數量仍然到達)
 */

3.Semaphore信號量

  信號量用於保護對一個或多個共享資源的訪問,其內部維護一個計數器,用來只是當前可以訪問共享資源的數量。可以通過tryAcquire去嘗試獲取許可,還可以通過availablePermits()方法得到可用的許可數目,而acquire/release則是獲取/釋放許可。

public class Thread12 implements Runnable {
    private static SecureRandom random= new SecureRandom();
    private static Semaphore semaphore = new Semaphore(3, true);

    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + " got permission...");
            Thread.sleep(random.nextInt(10000));
            semaphore.release();
            System.out.println(Thread.currentThread().getName() + " released permission...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
        for (int i = 0; i < 6; i++) {
            es.execute(new Thread12());
        }
        es.shutdown();
    }
}

4.fork/join框架

 Fork/Join框架提供了的一個用於並行執行任務的框架,充分利用了CPU資源,把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。(只供Java7使用)

Fork/Join使用兩個類:

  • ForkJoinTask:我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制,Fork/Join框架提供了以下兩個子類:

    • RecursiveAction:用於沒有返回結果的任務。

    • RecursiveTask :用於有返回結果的任務。

  • ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中。

ForkJoinPool與其他類型的ExecutorService的不同之處主要在於使用工作竊取,每個線程都有自己的雙端任務隊列,線程在一般情況下會從隊列頭去獲取任務,當某個線程任務隊列的為空的時候,它會嘗試從其他線程的任務隊列的尾部去“竊取”任務來執行。

public class Thread13 extends RecursiveTask<Integer> {
    private int start;
    private int end;

    public Thread13(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int m = 1000;   //每個線程計算的范圍大小
        int s = start, n = end;  //每個線程計算的起始地址
        int r = 0;  //算和的變量
        List<ForkJoinTask<Integer>> it = new ArrayList<ForkJoinTask<Integer>>();
        while (s <= n) {
            if (n - s < m) {
                for (int i = s; i <= n; i++) {
                    r += i;
                }
            } else {
                n = Math.min(s + m - 1, n);        //得到一個新的start
                it.add(new Thread13(s, n).fork());  //得到每一個范圍[如(0,999)]加入一個線程
            }
            s = n + 1;
            n = end;
        }

        for (ForkJoinTask<Integer> t : it) {
            r += t.join();
        }
        return r;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool fjp = new ForkJoinPool();
        int s = 1, n = 10001;
        Future<Integer> rs = fjp.submit(new Thread13(s, n));
        System.out.println(rs.get());
    }
}
/* output
 * 50015001
 */

  


 參考文獻

  1. 龐永華. Java多線程與Socket:實戰微服務框架[M].電子工業出版社.2019.3

  2. Executors類中創建線程池的幾種方法的分析

  3. 知乎——如果你這樣回答“什么是線程安全”,面試官都會對你刮目相看

  4. 知乎——Java線程內存模型,線程、工作內存、主內存

  5. Java進階——Java中的Atomic原子特性

  6. 深入理解Java並發之synchronized實現原理

  7. Java的wait(), notify()和notifyAll()使用小結

  8. java多線程-07-Lock和Condition

  9. Java並發編程:CountDownLatch、CyclicBarrier和Semaphor


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM