三個線程交替按順序打印ABC之條件隊列的理解


如題。本文給出交替打印的代碼示例,並解釋了條件變量在代碼實現中所起的作用。

  • 使用三個線程,一個只負責打印A,另一個只負責打印B,最后一個只負責打印C
  • 按順序交替。即打印A后,才能打印B,打印B后,才能打印C

由於按序交替,最好采用條件隊列來實現。初始時,只有打印A的條件滿足 打印B、C的條件都不滿足。A打印后,使得打印B的條件滿足,同時打印A的條件由原來的滿足變成不滿足;B打印后,使得打印C的條件滿足,同時打印B的條件由原來的滿足變成不滿足;C打印后,使得打印A的條件滿足,同時打印C的條件由原來的滿足變成不滿足。

采用鎖+條件隊列實現的優勢:
鎖+條件隊列是基於"通知-喚醒"機制實現的,比sleep+輪詢的方式要高效。這篇文章最后第6點簡要說明了這2種機制。

完整代碼如下:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author psj
 * @date 20-3-7
 */
public class PrintABC {
    private ReentrantLock lock = new ReentrantLock();
    //與鎖關聯的條件隊列,當打印條件不滿足時,掛起線程(通知喚醒機制,而不是sleep或者輪詢)
    private Condition printA = lock.newCondition();
    private Condition printB = lock.newCondition();
    private Condition printC = lock.newCondition();

    //初始化 打印A的條件成立,打印B不成立,打印C不成立
    private volatile boolean isA = true;
    private volatile boolean isB = false;
    private volatile boolean isC = false;


    public static void main(String[] args) {
        PrintABC pabc = new PrintABC();

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        pabc.printA();
                    } catch (InterruptedException e) {
                        System.out.println(Thread.currentThread().getName() + " 退出打印");
                        break;
                    }
                }
            }
        }, "t1");

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        pabc.printB();
                    } catch (InterruptedException e) {
                        //響應中斷退出打印
                        System.out.println(Thread.currentThread().getName() + " 退出打印");
                        break;
                    }
                }
            }
        }, "t2");

        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        pabc.printC();
                    } catch (InterruptedException e) {
                        System.out.println(Thread.currentThread().getName() + " 退出打印");
                        break;
                    }
                }
            }
        }, "t3");

        t2.start();
        t3.start();
        t1.start();


//        sleepMills(10 * 1000);
//        t1.interrupt();
    }

    public void printA() throws InterruptedException{
        try {
            lock.lock();
            while (!isA) {
                printA.await();
            }
            System.out.println(Thread.currentThread().getName() + " print A");
            sleepMills(2000);
            //A 已打印,將打印A的條件由原來的滿足變成不滿足
            isA = false;
            //將打印B的條件變成滿足
            isB = true;
            //通知線程打印B
            printB.signal();
        }finally {
            lock.unlock();
        }
    }

    public void printB()throws InterruptedException {
        try {
            lock.lock();
            while (!isB) {
                printB.await();
            }
            System.out.println(Thread.currentThread().getName() + " print B");
            //模擬方法執行耗時
            sleepMills(2000);
            //打印B的條件由滿足變成不滿足
            isB = false;
            //使得打印C的條件變成滿足
            isC = true;
            printC.signal();
        }finally {
            lock.unlock();
        }
    }

    public void printC()throws InterruptedException {
        try {
            lock.lock();
            while (!isC) {
                printC.await();
            }
            System.out.println(Thread.currentThread().getName() + " print C");
            sleepMills(2000);
            //C已打印,將打印C的條件由原來的滿足變成不滿足
            isC = false;
            //將打印A的條件變成滿足
            isA = true;
            printA.signal();
        }finally {
            lock.unlock();
        }
    }

    private static void sleepMills(long mills) {
        try {
            TimeUnit.MILLISECONDS.sleep(mills);
        } catch (InterruptedException e) {
            System.out.println(e);
        }
    }
}

再來看一個交替打印AB的示例。這里給出了2種實現思路,一種是基於 volatile變量;另一種是采用條件隊列。對比了這2種實現之后,討論了條件隊列背后的原理(通知喚醒機制、線程調度、線程阻塞狀態……)

import java.util.concurrent.TimeUnit;

/**
 * @author psj
 * @date 20-3-7
 */
public class PrintAB {

    private volatile boolean isA = true;

    public static void main(String[] args) {
        PrintAB pab = new PrintAB();

        Thread t1 = new Thread(() -> {
            while (true) {
                pab.printA();
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            while (true) {
                pab.printB();
            }
        }, "t2");

        t2.start();
        t1.start();
    }

    public void printA() {
        if (isA) {
            System.out.println(Thread.currentThread().getName() + " print A");
            //模擬方法執行耗時
            sleepMills(1000);
            isA = false;
        }
    }

    public void printB() {
        if (!isA) {
            System.out.println(Thread.currentThread().getName() + " print B");
            sleepMills(2000);
            isA = true;
        }
    }

    private static void sleepMills(long mills) {
        try {
            TimeUnit.MILLISECONDS.sleep(mills);
        } catch (InterruptedException e) {
            System.out.println(e);
        }
    }
}

使用一個volatile變量協調2個線程交替打印A、B的順序。此種方式是很消耗CPU的,因為:2個線程是在while true循環中不停地測試打印條件是否成立。另一種優雅的方式則是采用通知喚醒機制:當條件不成立時,讓線程放棄cpu,掛起線程,進入阻塞狀態(WAITING),當條件成立后,再喚醒線程,讓它再次去爭搶cpu,執行打印。這可以通過條件隊列來實現,代碼如下:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author psj
 * @date 20-3-7
 */
public class PrintABCondition {
    private Lock lock = new ReentrantLock();
    private Condition pac = lock.newCondition();
    private Condition pbc = lock.newCondition();

    //決定打印A or 打印B 條件是否滿足
    private volatile boolean printA = true;

    public void printA() throws InterruptedException{
        try {
            lock.lock();
            while (!printA) {
                //打印A的條件未滿足,掛起線程,放棄cpu,進入WAITING狀態
                pac.await();
            }
            //打印A的條件滿足了,打印A
            System.out.println(Thread.currentThread().getName() + " print A");
            //模擬方法執行耗時
            sleepMills(1500);
            //A 已經打印完畢, 使得打印B的條件滿足, 接下來發送通知 喚醒打印B的線程
            printA = false;
            pbc.signal();
        }finally {
            lock.unlock();
        }
    }

    public void printB() throws InterruptedException{
        try {
            lock.lock();
            while (printA) {
                //打印B的條件未滿足,掛起線程,放棄cpu,進入WAITING狀態
                pbc.await();
            }
            System.out.println(Thread.currentThread().getName() + " print B");
            sleepMills(2000);
            //B 已打印完畢,使得打印A的條件滿足,接下來發送通知 喚醒打印A的線程
            printA = true;
            pac.signal();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        PrintABCondition pab = new PrintABCondition();

        Thread t1 = new Thread(() -> {
            while (true) {
                try {
                    pab.printA();
                } catch (InterruptedException e) {
                    //響應中斷
                    break;
                }
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            while (true) {
                try {
                    pab.printB();
                } catch (InterruptedException e) {
                    break;
                }
            }
        }, "t2");

        t2.start();
        t1.start();
    }

    private static void sleepMills(long mills) {
        try {
            TimeUnit.MILLISECONDS.sleep(mills);
        } catch (InterruptedException e) {
            System.out.println(e);
        }
    }
}

看juc並發包Condition.java的await方法里面有一段注釋:

In all cases, before this method can return the current thread must re-acquire the lock associated with this condition。When the thread returns it is guaranteed to hold this lock.

這里從打印A的線程角度來解釋一下:打印A的線程在從 await()方法返回時,必須重新爭搶鎖,爭搶到鎖之后,就會再執行while循環測試條件是否滿足,如果此時條件滿足(printA變為true)了,那就往下執行。如果條件不滿足(printA為false),那么就放棄cpu,進入WAITING狀態,等待喚醒。
從線程調度的角度來說,當執行Thread#start()后,線程從NEW狀態變成RUNNABLE狀態,此時線程具有運行的資格--可以被線程調度器選中占用cpu執行,但並不是說該線程一定占有cpu在運行了。由於"最小時間片"原則,每個線程一般都會占用cpu運行一小段時間,然后由於"搶占式調度",就被調度器切換出去了,線程不再占有cpu了(這種情形下的切換是多線程並發執行所固有的性質),與 "多個線程爭搶同一把鎖,未獲得鎖的線程被阻塞掛起,從而不再占有cpu了" 是不同的,要注意區分。

這里說一下為什么要在while循環里面測試條件,當條件不滿足時,調用await方法使得線程放棄cpu,進入WAITING狀態。為什么用while,if語句不可以嗎?
我覺得用while循環的原因是:其它線程可能“無意”間調用了singal()使得該線程被喚醒了(又或者是線程因為某種未知原因喚醒了),線程醒來之后需要重新測試條件是否滿足,所以只能用while循環。
實際上,await()底層是調用LockSupport#park(java.lang.Object)來掛起線程的,那看看該方法的注釋,想起一個問題:當一個線程被阻塞掛起時,有哪些方法可以讓它恢復執行?在開始討論之前,再次明確一下:所謂恢復執行,只是使得線程"醒過來"具有執行的資格,並不一定保證線程就拿到了cpu,正在運行了,記住:搶占式調度,是由線程調度器來決定將哪個cpu分配給線程運行的。
OK,我覺得主要有兩種方式喚醒線程,恢復執行。一種是"中斷",即線程通過響應 InterruptedException 異常,退出阻塞狀態;另一種是其它線程發送"通知",比如調用signal/signalAll方法(底層是調用LockSupport#unpark),使得線程退出阻塞狀態。
但是,看LockSupport#park方法的注釋,還提到了一種情況:

The call spuriously (that is, for no reason) returns.

這句話也驗證了,為什么只能用while循環(不能用if語句)來測試條件是否滿足(比如打印AB示例代碼中的 printA 條件變量)的一個原因,因為線程可能不知道什么原因被喚醒了,只有while循環才能保證線程醒來之后會重新測試條件是否滿足。

額外補充一下,這里為什么是線程阻塞后,是WAITING狀態,而不是BLOCKED狀態呢?哈哈。看 Thread.java 類的關於線程狀態描述的源碼注釋(hint:等待條件滿足)就知道了。

使用條件隊列的好處:

  • 通知喚醒機制,代碼高效
  • 能清楚看到線程在哪個條件上阻塞,並發邏輯清晰

參考資料:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM