並發編程筆記


進程與線程

進程

  • 程序由指令和數據組成,但這些指令要運行,數據要讀寫,就必須將指令加載至 CPU,數據加載至內存。在指令運行過程中還需要用到磁盤、網絡等設備。進程就是用來加載指令、管理內存、管理 IO 的
  • 當一個程序被運行,從磁盤加載這個程序的代碼至內存,這時就開啟了一個進程。
  • 進程就可以視為程序的一個實例。大部分程序可以同時運行多個實例進程(例如記事本、畫圖、瀏覽器等),也有的程序只能啟動一個實例進程(例如網易雲音樂、360 安全衛士等)

線程

  • 一個進程之內可以分為一到多個線程。
  • 一個線程就是一個指令流,將指令流中的一條條指令以一定的順序交給 CPU 執行
  • Java 中,線程作為最小調度單位,進程作為資源分配的最小單位。 在 windows 中進程是不活動的,只是作為線程的容器

兩者對比

  • 進程基本上相互獨立的,而線程存在於進程內,是進程的一個子集
  • 進程擁有共享的資源,如內存空間等,供其內部的線程共享
  • 進程間通信較為復雜
  • 同一台計算機的進程通信稱為 IPC(Inter-process communication)
  • 不同計算機之間的進程通信,需要通過網絡,並遵守共同的協議,例如 HTTP
  • 線程通信相對簡單,因為它們共享進程內的內存,一個例子是多個線程可以訪問同一個共享變量
  • 線程更輕量,線程上下文切換成本一般上要比進程上下文切換低

並發與並行

單核 cpu 下,線程實際還是 串行執行的。操作系統中有一個組件叫做任務調度器,將 cpu 的時間片(windows下時間片最小約為 15 毫秒)分給不同的程序使用,只是由於 cpu 在線程間(時間片很短)的切換非常快,人類感覺是 同時運行的 。總結為一句話就是: 微觀串行,宏觀並行 ,一般會將這種 線程輪流使用 CPU 的做法稱為並發

cpu 時間片1 時間片2 時間片3
core 線程1 線程2 線程3

image
多核 cpu下,每個 核(core)都可以調度運行線程,這時候線程可以是並行的。

cpu 時間片1 時間片2 時間片3
core 線程1 線程2 線程3
core 線程2 線程3 線程1

image

  • 並發(concurrent)是同一時間應對(dealing with)多件事情的能力
  • 並行(parallel)是同一時間動手做(doing)多件事情的能力

同步和異步

以調用方角度來講,如果需要等待結果返回,才能繼續運行就是同步,不需要等待結果返回,就能繼續運行就是異步
image

多核cpu才能夠提高效率,單核仍然是輪流執行,還多了線程切換的時間

  1. 單核 cpu 下,多線程不能實際提高程序運行效率,只是為了能夠在不同的任務之間切換,不同線程輪流使用cpu ,不至於一個線程總占用 cpu,別的線程沒法干活
  2. 多核 cpu 可以並行跑多個線程,但能否提高程序運行效率還是要分情況的,有些任務,經過精心設計,將任務拆分,並行執行,當然可以提高程序的運行效率。但不是所有計算任務都能拆分(參考后文的【阿姆達爾定律】)也不是所有任務都需要拆分,任務的目的如果不同,談拆分和效率沒啥意義
  3. IO 操作不占用 cpu,只是我們一般拷貝文件使用的是【阻塞 IO】,這時相當於線程雖然不用 cpu,但需要一直等待 IO 結束,沒能充分利用線程。所以才有后面的【非阻塞 IO】和【異步 IO】優化

Java線程

本章內容

  • 創建和運行線程
  • 查看線程
  • 線程 API
  • 線程狀態

創建核運行線程

       // 最常規的方式,不推薦
        new Thread("thread1"){
            @Override
            public void run() {
                System.out.println("我是另外一個線程");
            }
        }.start();
        System.out.println("你好啊啊哈哈哈Z");
        TimeUnit.SECONDS.sleep(1);
        System.out.println("啊哈哈哈");
       //線程和任務分開,更加靈活
       Runnable runable = ()->log.info("我是b線程");
       new Thread(runable,"t2").start();
       log.info("我是主線程");

        FutureTask<String> futureTask = new FutureTask<>(()->{
            log.info("我是另外一個線程");
            TimeUnit.SECONDS.sleep(3);
            return "啊哈哈哈";
        });
        new Thread(futureTask,"t1").start();
        log.info("我是主線程");
        // 間接實現了future接口,所以可以
        log.info(futureTask.get());

image

線程上下文切換

因為以下一些原因導致 cpu 不再執行當前的線程,轉而執行另一個線程的代碼
切換的原因

  • 線程的 cpu 時間片用完
  • 垃圾回收
  • 有更高優先級的線程需要運行
  • 線程自己調用了 sleep、yield、wait、join、park、synchronized、lock 等方法
    當 Context Switch 發生時,需要由操作系統保存當前線程的狀態,並恢復另一個線程的狀態,Java 中對應的概念
  • 就是程序計數器(Program Counter Register),它的作用是記住下一條 jvm 指令的執行地址,是線程私有的
  • 狀態包括程序計數器、虛擬機棧中每個棧幀的信息,如局部變量、操作數棧、返回地址等
  • Context Switch 頻繁發生會影響性能
    image
    image
相關方法解析

run和start

  • 直接調用 run 是在主線程中執行了 run,沒有啟動新的線程
  • 使用 start 是啟動新的線程,通過新的線程間接執行 run 中的代碼
    sleep和yield
  1. 調用 sleep 會讓當前線程從 Running 進入 Timed Waiting 狀態(阻塞),此時任務調度器不會分配時間片給該線程
  2. 其它線程可以使用 interrupt 方法打斷正在睡眠的線程,這時 sleep 方法會拋出 InterruptedException
  3. 睡眠結束后的線程未必會立刻得到執行
  4. 建議用 TimeUnit 的 sleep 代替 Thread 的 sleep 來獲得更好的可讀性
    yield意思為禮讓一下
  5. 調用 yield 會讓當前線程從 Running 進入 Runnable 就緒狀態,然后調度執行線程,此時任務調度器可以分配時間片給該線程
  6. 具體的實現依賴於操作系統的任務調度器
    可以通過thread.setPriority(Thread.MAX_PRIORITY)設置線程優先級,在cpu繁忙的時候,設置了優先級會有更大的機會優先執行,空閑的時候沒有什么作用
        Runnable task1 = () -> {
            int count = 0;
            for (; ; ) {
                System.out.println("---->1 " + count++);
            }
        };
        Runnable task2 = () -> {
            int count = 0;
            for (; ; ) {
        // Thread.yield();
                System.out.println(" ---->2 " + count++);
            }
        };
        Thread t1 = new Thread(task1, "t1");
        Thread t2 = new Thread(task2, "t2");
        // t1.setPriority(Thread.MIN_PRIORITY);
        // t2.setPriority(Thread.MAX_PRIORITY);
        t1.start();
        t2.start();

sleep可以避免cpu空轉導致cpu被占滿

join方法詳解

static int r = 10;
    @Test
    public void test() throws InterruptedException {
        Thread thread = new Thread(() -> {
            log.info("線程開始更改r變量");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            r = 100;
        });
        thread.start();
        thread.join();
        log.info("r的值為{}", r);
    }

如下代碼,如果不實用join,此時r的直還沒有更改,顯示的還是更改前的數值。
join的作用為等待某個線程運行結束
多個join情況

 @Test
    public void test() throws InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                log.info("t1開始");
                TimeUnit.SECONDS.sleep(1);
                log.info("t1結束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t1");
        Thread thread1 = new Thread(() -> {
            try {
                log.info("t2開始");
                TimeUnit.SECONDS.sleep(2);
                log.info("t2結束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2");
        log.info("主線程開始");
        thread.start();
        thread1.start();
        thread.join();
        thread1.join();
        log.info("主線程結束");
    }

相關結果
image
只需要等待兩秒
如果顛倒兩個join,示意圖如下
image
有時效的join

        Thread thread = new Thread(() -> {
            try {
                log.info("t1開始");
                TimeUnit.SECONDS.sleep(2);
                log.info("t1結束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t1");
        log.info("主線程開始");
        thread.start();
        thread.join(1);
        log.info("主線程結束");

結果如下
image
主線程結束那么就結束了。如果沒等夠結束繼續向下運行。

interrupt 方法詳解

sleep,join,wait(后面介紹)會使該線程處於阻塞狀態,處於改狀態的線程cpu將不會分配時間片給該線程,這也是sleep可以降低cpu的使用率的原因。處於阻塞狀態下的線程可以被打斷,此時會拋出異常。打斷處於阻塞狀態下的線程,此時不會處理打斷狀態,打斷正常運行下的線程,打斷標記會置為真,可以用來優雅的停止線程

	@Test
    public void test()  {
        Thread thread = new Thread(() -> {
            sleep(2);
        },"t1");
        log.info("主線程開始");
        thread.start();
        sleep(1);
        thread.interrupt();
        log.info(thread.isInterrupted()+"");
        log.info("主線程結束");
    }

image
需要在線程中自己處理打斷的狀態,代碼如下

兩階段終止模式(在t1線程中優雅的停止t2線程)

image

        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    if (Thread.currentThread().isInterrupted()){
                        log.info("我被打斷了(▰˘︹˘▰)");
                        break;
                    }
                    log.info("執行監控");
                    sleep(2);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        },"t1");
        thread.start();
        sleep(5);
        log.info("主線程打斷監控線程");
        thread.interrupt();
        sleep(3);
        log.info("主線程結束");

image

    @Test
    public void test() throws InterruptedException {
        // 處於這個狀態也是阻塞
        // 當被打算后打斷標志為真
        Thread thread = new Thread(()->{
            log.info("開始打斷");
            LockSupport.park();
           
            log.info("打斷標志{}",Thread.currentThread().isInterrupted());
            // 當打斷標志為真的時候再次執行park時效,打斷標志為加的時候才有效, 
            // Thread.interrupted()可以清除打斷標志
            LockSupport.park();
            log.info("啦啦啦");
        });
        thread.start();
        sleep(2);
        thread.interrupt();
        sleep(1);
    }

image

守護線程

image
我在本地實驗沒有達到這個效果。main線程結束后就沒有下文了。

線程的狀態

image

  • NEW 線程剛被創建,但是還沒有調用 start() 方法
  • RUNNABLE 當調用了 start() 方法之后,注意,Java API 層面的 RUNNABLE 狀態涵蓋了 操作系統 層面的
  • 【可運行狀態】、【運行狀態】和【阻塞狀態】(由於 BIO 導致的線程阻塞,在 Java 里無法區分,仍然認為是可運行)
  • BLOCKED , WAITING , TIMED_WAITING 都是 Java API 層面對【阻塞狀態】的細分,后面會在狀態轉換一節詳述
  • TERMINATED 當線程代碼運行結束
        Thread t1 = new Thread("t1") {
            @Override
            public void run() {
                log.debug("running...");
            }
        };

        Thread t2 = new Thread("t2") {
            @Override
            public void run() {
                while(true) { // runnable

                }
            }
        };
        t2.start();

        Thread t3 = new Thread("t3") {
            @Override
            public void run() {
                log.debug("running...");
            }
        };
        t3.start();

        Thread t4 = new Thread("t4") {
            @Override
            public void run() {
                synchronized (TestState.class) {
                    try {
                        Thread.sleep(1000000); // timed_waiting
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        t4.start();

        Thread t5 = new Thread("t5") {
            @Override
            public void run() {
                try {
                    t2.join(); // waiting
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        t5.start();

        Thread t6 = new Thread("t6") {
            @Override
            public void run() {
                synchronized (TestState.class) { // blocked
                    try {
                        Thread.sleep(1000000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        t6.start();

        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("t1 state {}", t1.getState());
        log.debug("t2 state {}", t2.getState());
        log.debug("t3 state {}", t3.getState());
        log.debug("t4 state {}", t4.getState());
        log.debug("t5 state {}", t5.getState());
        log.debug("t6 state {}", t6.getState());
        System.in.read();

線程的6種狀態代碼示例如上
多線程統籌的事例
image

        Thread t1 = new Thread(()->{
            log.info("洗水壺");
            sleep(1);
            log.info("燒開水");
            sleep(15);
        },"t1");
        Thread t2 = new Thread(()->{
            log.info("洗茶壺");
            sleep(1);
            log.info("洗茶杯");
            sleep(2);
            log.info("拿茶葉");
            sleep(1);
            try {
                t1.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("泡茶");

        },"t2");
        t1.start();
        t2.start();

image
需要花費16秒

共享模型之管程

本章內容

  • 共享問題
  • synchronized
  • 線程安全分析
  • Monitor
  • wait/notify
  • 線程狀態轉換
  • 活躍性
  • Lock
    image
    image
    image
    image
    多線程操作共享資源此時會有線程安全的問題
        Thread t1 = new Thread(()->{
            for (int i = 0; i < 5000; i++) {
                n++;
            }
        },"t1");
        Thread t2 = new Thread(()->{
            for (int i = 0; i < 500; i++) {
                n--;
            }

        },"t2");
        t1.start();
        t2.start();
        t2.join();
        t1.join();
        log.info("最終n的結果為{}",n);

發現最終n的結果不是0
image

在多個線程對共享資源讀寫操作時發生指令交錯,就會出現問題

一段代碼塊內如果存在對共享資源的多線程讀寫操作,稱這段代碼塊為臨界區
既有讀又有寫
多個線程在臨界區內執行,由於代碼的執行序列不同而導致結果無法預測,稱之為發生了競態條件
為了避免臨界區的競態條件發生,有多種手段可以達到目的。

  • 阻塞式的解決方案:synchronized,Lock
  • 非阻塞式的解決方案:原子變量
    當一個對象擁有鎖的時候,其他線程會進入到阻塞狀態,獲取鎖的執行完畢會喚醒拿不到該鎖阻塞的線程
    synchronized (Demo01.class){n++;}
    image
  • synchronized(對象) 中的對象,可以想象為一個房間(room),有唯一入口(門)房間只能一次進入一人進行計算,線程 t1,t2 想象成兩個人
  • 當線程 t1 執行到 synchronized(room) 時就好比 t1 進入了這個房間,並鎖住了門拿走了鑰匙,在門內執行count++ 代碼
  • 這時候如果 t2 也運行到了 synchronized(room) 時,它發現門被鎖住了,只能在門外等待,發生了上下文切換,阻塞住了
  • 這中間即使 t1 的 cpu 時間片不幸用完,被踢出了門外(不要錯誤理解為鎖住了對象就能一直執行下去哦),
  • 這時門還是鎖住的,t1 仍拿着鑰匙,t2 線程還在阻塞狀態進不來,只有下次輪到 t1 自己再次獲得時間片時才能開門進入
  • 當 t1 執行完 synchronized{} 塊內的代碼,這時候才會從 obj 房間出來並解開門上的鎖,喚醒t2 線程把鑰匙給他。t2 線程這時才可以進入 obj 房間,鎖住了門拿上鑰匙,執行它的 count-- 代碼
    image
    synchronized 實際是用對象鎖保證了臨界區內代碼的原子性,臨界區內的代碼對外是不可分割的,不會被線程切換所打斷。
    改造
    更符合面相對象線程操作資源類的思想
    @Test
    public  void test() throws InterruptedException {
        Room room = new Room();
        Thread t1 = new Thread(()->{
            for (int i = 0; i < 5000; i++) {
                room.increment();
            }
        },"t1");
        Thread t2 = new Thread(()->{
            for (int i = 0; i < 500; i++) {
                room.decrement();
            }

        },"t2");
        t1.start();
        t2.start();
        t2.join();
        t1.join();
        log.info("最終n的結果為{}",room.get());
    }

    class Room {
        int value = 0;
        public void increment() {
            synchronized (this) {
                value++;
            }
        }
        public void decrement() {
            synchronized (this) {
                value--;
            }
        }
        public int get() {
            synchronized (this) {
                return value;
            }
        }

        public Room() {
        }
    }

靜態方法中synchronized等價於鎖是該類,非靜態方法鎖是該對象的實例

線程安全問題

  1. 如果它們沒有共享,則線程安全
  2. 如果它們被共享了,根據它們的狀態是否能夠改變,又分兩種情況
    • 如果只有讀操作,則線程安全
    • 如果有讀寫操作,則這段代碼是臨界區,需要考慮線程安全
  • 局部變量是線程安全的
  • 但局部變量引用的對象則未必
    • 如果該對象沒有逃離方法的作用訪問,它是線程安全的
    • 如果該對象逃離方法的作用范圍,需要考慮線程安全
      成員變量的線程安全問題
class ThreadUnsafe {
    ArrayList<String> list = new ArrayList<>();
    public void method1(int loopNumber) {
        for (int i = 0; i < loopNumber; i++) {
            method2();
            method3();
        }
    }

    private void method2() {
        list.add("1");
    }

    private void method3() {
        list.remove(0);
    }

原因:多個線程添加的時候有可能只添加成功一次,多個線程執行刪除的時候執行了多次
list.add不是原子操作
常見的線程安全類

  • String
  • Integer
  • StringBuffer
  • Random
  • Vector
  • Hashtable
  • java.util.concurrent 包下的類
    線程安全指的是多個線程調用他們類的方法是安全的。即可以理解為他們的方法都是原子性的。但是組合起來不是原子的
    開閉原則中不想讓子類覆蓋的方法可以將該類設置成為final,如string
    多人買票線程代碼分析
public class ExerciseSell {
    public static void main(String[] args) throws InterruptedException {
        // 模擬多人買票
        TicketWindow window = new TicketWindow(1000);

        // 所有線程的集合
        List<Thread> threadList = new ArrayList<>();
        // 賣出的票數統計
        List<Integer> amountList = new Vector<>();
        for (int i = 0; i < 2000; i++) {
            Thread thread = new Thread(() -> {
                // 買票
                int amount = window.sell(random(5));
                // 統計買票數
                amountList.add(amount);
            });
            threadList.add(thread);
            thread.start();
        }

        for (Thread thread : threadList) {
            thread.join();
        }

        // 統計賣出的票數和剩余票數
        log.debug("余票:{}",window.getCount());
        log.debug("賣出的票數:{}", amountList.stream().mapToInt(i-> i).sum());
    }

    // Random 為線程安全
    static Random random = new Random();

    // 隨機 1~5
    public static int random(int amount) {
        return random.nextInt(amount) + 1;
    }
}

// 售票窗口
class TicketWindow {
    private int count;

    public TicketWindow(int count) {
        this.count = count;
    }

    // 獲取余票數量
    public int getCount() {
        return count;
    }

    // 售票
    public synchronized int sell(int amount) {
        if (this.count >= amount) {
            this.count -= amount;
            return amount;
        } else {
            return 0;
        }
    }
}

下面的這個由於涉及到兩個對象,因此需要鎖住該類

@Slf4j(topic = "c.ExerciseTransfer")
public class ExerciseTransfer {
    public static void main(String[] args) throws InterruptedException {
        Account a = new Account(1000);
        Account b = new Account(1000);
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                a.transfer(b, randomAmount());
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                b.transfer(a, randomAmount());
            }
        }, "t2");
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        // 查看轉賬2000次后的總金額
        log.debug("total:{}", (a.getMoney() + b.getMoney()));
    }

    // Random 為線程安全
    static Random random = new Random();

    // 隨機 1~100
    public static int randomAmount() {
        return random.nextInt(100) + 1;
    }
}

// 賬戶
class Account {
    private int money;

    public Account(int money) {
        this.money = money;
    }

    public int getMoney() {
        return money;
    }

    public void setMoney(int money) {
        this.money = money;
    }

    // 轉賬
    public void transfer(Account target, int amount) {
        synchronized(Account.class) {
            if (this.money >= amount) {
                this.setMoney(this.getMoney() - amount);
                target.setMoney(target.getMoney() + amount);
            }
        }
    }
}

對象頭的相關信息

image
image

intege占用空間分析,一個int占用4個字節,然后一個包裝對象需要包含對象頭核對象值,對象頭需要占用8個字節,int中一個包裝類型是普通類型的4倍
image
obj中的對象頭里記錄着對象的鎖的地址
image
后面synchronized的原理感覺很復雜,后面再看
image
image

wait 和notify

    @Test
    public  void test() throws InterruptedException {
        Object lock  = new Object();
        new Thread(()->{
            synchronized (lock) {
                log.info("我睡眠了");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("我被喚醒了");
            }
        },"t1").start();
        new Thread(()->{
            synchronized (lock) {
                log.info("我睡眠了");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("我被喚醒了");
            }
        },"t2").start();
        TimeUnit.SECONDS.sleep(2);
        synchronized (lock){
            log.info("我隨機喚醒一個");
            lock.notify();
        }
    }

wait和notify的正確姿勢
image

    public static void main(String[] args) {

        new Thread(() -> {
            synchronized (room) {
                log.debug("有煙沒?[{}]", hasCigarette);
				// 用 notifyAll 僅解決某個線程的喚醒問題,但使用 if + wait 判斷僅有一次機會,一旦條件不成立,就沒有重新判斷的機會了
                while (!hasCigarette) {
                    log.debug("沒煙,先歇會!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有煙沒?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("可以開始干活了");
                } else {
                    log.debug("沒干成活...");
                }
            }
        }, "小南").start();

        new Thread(() -> {
            synchronized (room) {
                Thread thread = Thread.currentThread();
                log.debug("外賣送到沒?[{}]", hasTakeout);
                while (!hasTakeout) {
                    log.debug("沒外賣,先歇會!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("外賣送到沒?[{}]", hasTakeout);
                if (hasTakeout) {
                    log.debug("可以開始干活了");
                } else {
                    log.debug("沒干成活...");
                }
            }
        }, "小女").start();

        sleep(1);
        new Thread(() -> {
            synchronized (room) {
                hasTakeout = true;
                log.debug("外賣到了噢!");
                room.notifyAll();
            }
        }, "送外賣的").start();


    }

保護性暫停

  • 有一個結果需要從一個線程傳遞到另一個線程,讓他們關聯同一個 GuardedObject
  • 如果有結果不斷從一個線程到另一個線程那么可以使用消息隊列(見生產者/消費者)
  • JDK 中,join 的實現、Future 的實現,采用的就是此模式
  • 因為要等待另一方的結果,因此歸類到同步模式
    image

主線程等待另外一個線程獲取結果

public class Demo01 {
    public static void main(String[] args) throws InterruptedException {
        GuardObject guardObject = new GuardObject();
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("獲取到了結果");
            guardObject.complete("hello");
        }).start();
        log.info("主線程等待結果");
        log.info(""+guardObject.get());
    }

}
class GuardObject{
    private Object response;
    private final Object lock = new Object();
    public Object get() throws InterruptedException {
        synchronized (lock) {
            while (Objects.isNull(response)){
                lock.wait();
            }
            return response;
        }
    }
    public void complete(Object response) {
        synchronized (lock) {
            this.response = response;
            lock.notifyAll();
        }
    }
}

join底層就是應用該線程進行操作
帶超時時間的等待
image
多任務的
image

相關事例

@Data
@Slf4j
public class Demo01 {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new People().start();
        }
        TimeUnit.SECONDS.sleep(2);
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id, "內容" + id).start();
        }
    }

}
@Slf4j
class People extends Thread{
    @Override
    public void run() {
        GuardedObject guardedObject = Mailboxes.createGuardedObject();
        log.info("開始收信 id:{}", guardedObject.getId());
        Object mail = guardedObject.get(5000);
        log.info("收到信 id:{}, 內容:{}", guardedObject.getId(), mail);
    }
}
@Slf4j
class Postman extends Thread {
    private int id;
    private String mail;
    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }
    @Override
    public void run() {
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.info("送信 id:{}, 內容:{}", id, mail);
        guardedObject.complete(mail);
    }
}
class Mailboxes {
    private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
    private static int id = 1;
    // 產生唯一 id
    private static synchronized int generateId() {
        return id++;
    }
    public static GuardedObject getGuardedObject(int id) {
        return boxes.remove(id);
    }
    public static GuardedObject createGuardedObject() {
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(), go);
        return go;
    }
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}
// 增加超時效果
class GuardedObject {

    // 標識 Guarded Object
    private int id;

    public GuardedObject(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    // 結果
    private Object response;

    // 獲取結果
    // timeout 表示要等待多久 2000
    public Object get(long timeout) {
        synchronized (this) {
            // 開始時間 15:00:00
            long begin = System.currentTimeMillis();
            // 經歷的時間
            long passedTime = 0;
            while (Objects.isNull(response)) {
                // 這一輪循環應該等待的時間
                long waitTime = timeout - passedTime;
                // 經歷的時間超過了最大等待時間時,退出循環
                if (waitTime <= 0) {
                    break;
                }
                try {
                    this.wait(waitTime); // 虛假喚醒 15:00:01
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 求得經歷時間
                passedTime = System.currentTimeMillis() - begin; // 15:00:02  1s
            }
            return response;
        }
    }

    // 產生結果
    public void complete(Object response) {
        synchronized (this) {
            // 給結果成員變量賦值
            this.response = response;
            this.notifyAll();
        }
    }
}

以上是生產者和消費者一一對應,生產的消息回立即被消費

生產者和消費者

image

public class Demo02 {
    public static void main(String[] args) throws InterruptedException {
        MessageQueue messageQueue = new MessageQueue(2);
        for (int i = 0; i < 3; i++) {
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info( messageQueue.take().toString());
            },"消費者"+i).start();
        }
        for (int i = 0; i < 5; i++) {
            int id = i;
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("防止的id為"+id);
                messageQueue.put(new Message(id,"消息"+id));
            },"生產者"+i).start();
        }

    }

}
@Data
class Message {
    private int id;
    private Object message;

    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }

    public int getId() {
        return id;
    }

    public Object getMessage() {
        return message;
    }
}
@Slf4j
class MessageQueue {
    private LinkedList<Message> queue;
    private int capacity;
    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }
    public Message take() {
       synchronized (queue) {
           while (queue.isEmpty()){
               log.info("隊列為空,等待...");
               try {
                   queue.wait();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
           queue.notifyAll();
          return queue.removeFirst();
       }
    }
    public void put(Message message) {
        synchronized (queue) {
            while (queue.size()==capacity){
                log.info("隊列滿了,等待...");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }
}

也可以消費者只有一個,一直循環

park和unpark

    public static void main(String[] args) throws InterruptedException {
         Thread t1 = new Thread(()->{
             log.info("我被park了");
             LockSupport.park();
             log.info("我又被unpark了");
         },"t1");
         t1.start();
        TimeUnit.SECONDS.sleep(2);
        LockSupport.unpark(t1);
    }

先unpark然后再park的效果

    public static void main(String[] args) throws InterruptedException {
         Thread t1 = new Thread(()->{
             try {
                 TimeUnit.SECONDS.sleep(2);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             log.info("我被park了");
             LockSupport.park();
             log.info("我又被unpark了");
         },"t1");
         t1.start();
        TimeUnit.SECONDS.sleep(1);
        LockSupport.unpark(t1);

此時日志輸出
image

  • park & unpark 是以線程為單位來【阻塞】和【喚醒】線程,而 notify 只能隨機喚醒一個等待線程,notifyAll
  • 是喚醒所有等待線程,就不那么【精確】park & unpark 可以先 unpark,而 wait & notify 不能先 notify
    image
    image

線程狀態轉換

image

  1. new->runnable 調用start方法
  2. RUNNABLE <--> WAITING t 線程用 synchronized(obj) 獲取了對象鎖后 調用 obj.wait() 方法時,t 線程從 RUNNABLE --> WAITING 調用 obj.notify() , obj.notifyAll() , t.interrupt() 時 競爭鎖成功,t 線程從 WAITING --> RUNNABLE 競爭鎖失敗,t 線程從 WAITING --> BLOCKED當前線程調用 t.join() 方法時,當前線程從 RUNNABLE --> WAITING注意是當前線程在t 線程對象的監視器上等待t 線程運行結束,或調用了當前線程的 interrupt() 時,當前線程從 WAITING --> RUNNABLE 當前線程調用 LockSupport.park() 方法會讓當前線程從 RUNNABLE --> WAITING 調LockSupport.unpark(目標線程) 或調用了線程 的 interrupt() ,會讓目標線程從WAITING -->RUNNABLE
  3. RUNNABLE <--> TIMED_WAITINGimage
  4. RUNNABLE <--> BLOCKED t 線程用 synchronized(obj) 獲取了對象鎖時如果競爭失敗,從 RUNNABLE --> BLOCKED持 obj 鎖線程的同步代碼塊執行完畢,會喚醒該對象上所有 BLOCKED 的線程重新競爭,如果其中 t 線程競爭成功,從 BLOCKED --> RUNNABLE ,其它失敗的線程仍然 BLOCKE

降低鎖的粒度,准備多把鎖

class BigRoom {
    private final Object studyRoom = new Object();
    private final Object bedRoom = new Object();

    public void sleep() {
        synchronized (bedRoom) {
            log.debug("sleeping 2 小時");
            TimeUnit.SECONDS.sleep(2);
        }
    }

    public void study() {
        synchronized (studyRoom) {
            log.debug("study 1 小時");
           TimeUnit.SECONDS.sleep(1);
        }
    }
}

需要保證這兩個方法沒有關聯,可以提高並發度,但是這種情況下容易造成死鎖。

        Object A = new Object();
        Object B = new Object();
        Thread t1 = new Thread(() -> {
            synchronized (A) {
                log.debug("lock A");
                sleep(1);
                synchronized (B) {
                    log.debug("lock B");
                    log.debug("操作...");
                }
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            synchronized (B) {
                log.debug("lock B");
                sleep(0.5);
                synchronized (A) {
                    log.debug("lock A");
                    log.debug("操作...");
                }
            }
        }, "t2");
        t1.start();
        t2.start();

哲學家死鎖問題
相互持有對方的鎖不放開

public class TestDeadLock {
    public static void main(String[] args) {
        Chopstick c1 = new Chopstick("1");
        Chopstick c2 = new Chopstick("2");
        Chopstick c3 = new Chopstick("3");
        Chopstick c4 = new Chopstick("4");
        Chopstick c5 = new Chopstick("5");
        new Philosopher("蘇格拉底", c1, c2).start();
        new Philosopher("柏拉圖", c2, c3).start();
        new Philosopher("亞里士多德", c3, c4).start();
        new Philosopher("赫拉克利特", c4, c5).start();
        new Philosopher("阿基米德", c1, c5).start();
    }
}

@Slf4j(topic = "c.Philosopher")
class Philosopher extends Thread {
    Chopstick left;
    Chopstick right;

    public Philosopher(String name, Chopstick left, Chopstick right) {
        super(name);
        this.left = left;
        this.right = right;
    }

    @Override
    public void run() {
        while (true) {
            // 嘗試獲得左手筷子
            synchronized (left) {
                // 嘗試獲得右手筷子
                synchronized (right) {
                    eat();
                }
            }
        }
    }

    Random random = new Random();
    private void eat() {
        log.debug("eating...");
        Sleeper.sleep(0.5);
    }
}

class Chopstick {
    String name;

    public Chopstick(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "筷子{" + name + '}';
    }
}

活鎖

兩個線程改變了對方的結束條件,誰也無法結束
image
解決死鎖可以一次性獲得所有的鎖,這種情況下會產生飢餓問題。

線程飢餓問題

image

ReentrantLock

相對於 synchronized 它具備如下特點

  • 可中斷
  • 可以設置超時時間
  • 可以設置為公平鎖
  • 支持多個條件變量
  • 與 synchronized 一樣,都支持可重入
基本語法
// 獲取鎖
reentrantLock.lock();
try {
	// 臨界區
	} finally {
	// 釋放鎖
	reentrantLock.unlock();
}
可重入

可重入是指同一個線程如果首次獲得了這把鎖,那么因為它是這把鎖的擁有者,因此有權利再次獲取這把鎖如果是不可重入鎖,那么第二次獲得鎖時,自己也會被鎖擋住

static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
        lock.lock();
        try {
            method1();
        } finally {
            lock.unlock();
        }
    }
    public static void method1(){
        lock.lock();
        try {
            log.info("進入到method1");
            method2();
        }finally {
            lock.unlock();
        }
    }

    private static void method2() {
        lock.lock();
        try {
            log.info("進入到method2");
        }finally {
            lock.unlock();
        }
    }
可被打斷
 static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
       Thread t1 = new Thread(()->{
           try {
               lock.lockInterruptibly();
           } catch (InterruptedException e) {
               e.printStackTrace();
               log.info("獲得所的過程中被打斷");
           }
           try {
               log.info("獲取到了鎖");
           }finally {
               lock.unlock();
           }
       },"t1");
       lock.lock();
        t1.start();
        try {
            Sleeper.sleep(1);
            log.info("打斷鎖");
            t1.interrupt();
        } finally {
            lock.unlock();
        }
    }

image
雖然被打斷了,但是還是可以獲取的到

可超時
public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            log.info("啟動...");
            // 可以加時間
            try {
                if (!lock.tryLock(2, TimeUnit.SECONDS)) {
                    log.info("獲取立刻失敗,返回");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                log.info("獲得了鎖");
            } finally {
                lock.unlock();
            }
        }, "t1");
        lock.lock();
        log.info("獲得了鎖");
        t1.start();
        try {
            Sleeper.sleep(3);
        } finally {
            lock.unlock();
        }
    }

image

公平鎖,在創建鎖的時候指定是否公平
條件變量
  • synchronized 中也有條件變量,wait notify 就是我們講原理時那個 waitSet 休息室,當條件不滿足時進入 waitSet 等待ReentrantLock 的條件變量比 synchronized 強大之處在於,它是支持多個條件變量的,這就好比synchronized 是那些不滿足條件的線程都在一間休息室等消息而 ReentrantLock 支持多間休息室,有專門等煙的休息室、專門等早餐的休息室、喚醒時也是按休息室來喚醒

使用要點:

  • await 前需要獲得鎖
  • await 執行后,會釋放鎖,進入 conditionObject 等待
  • await 的線程被喚醒(或打斷、或超時)取重新競爭 lock 鎖
  • 競爭 lock 鎖成功后,從 await 后繼續執行
    static ReentrantLock lock = new ReentrantLock();
    static Condition waitCigaretteQueue = lock.newCondition();
    static Condition waitbreakfastQueue = lock.newCondition();
    static volatile boolean hasCigrette = false;
    static volatile boolean hasBreakfast = false;
    public static void main(String[] args) throws InterruptedException {
        new Thread(()->{
            lock.lock();
            try {
                while (!hasCigrette) {
                    log.info("沒有煙,不干活");
                    waitCigaretteQueue.await();
                }
                // 此處和wait 相比不許再判斷
                log.info("干活");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        },"t1").start();
        new Thread(()->{
            lock.lock();
            try {
                while (!hasBreakfast) {
                    log.info("沒有早餐,不干活");
                    waitbreakfastQueue.await();
                }
                // 此處和wait 相比不許再判斷
                log.info("干活");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        },"t2").start();
        lock.lock();
        try {
            log.info("喚醒早餐和香煙");
            Sleeper.sleep(2);
            hasBreakfast = true;
            hasCigrette = true;
            waitbreakfastQueue.signal();
            waitCigaretteQueue.signal();
        } finally {
            lock.unlock();
        }

    }
本章小結

本章我們需要重點掌握的是

  • 分析多線程訪問共享資源時,哪些代碼片段屬於臨界區
  • 使用 synchronized 互斥解決臨界區的線程安全問題
    • 掌握 synchronized 鎖對象語法
    • 掌握 synchronzied 加載成員方法和靜態方法語法
    • 掌握 wait/notify 同步方法
  • 使用 lock 互斥解決臨界區的線程安全問題
    • 掌握 lock 的使用細節:可打斷、鎖超時、公平鎖、條件變量
  • 學會分析變量的線程安全性、掌握常見線程安全類的使用
  • 了解線程活躍性問題:死鎖、活鎖、飢餓
  • 應用方面
    • 互斥:使用 synchronized 或 Lock 達到共享資源互斥效果
    • 同步:使用 wait/notify 或 Lock 的條件變量來達到線程間通信效果
  • 原理方面
    • monitor、synchronized 、wait/notify 原理
    • synchronized 進階原理
    • park & unpark 原理
  • 模式方面
    • 同步模式之保護性暫停
    • 異步模式之生產者消費者
    • 同步模式之順序控制


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM