Java多線程


多線程基礎

一、創建線程

僅僅只有new Thread這種方法創建線程

Java中無法銷毀一個線程,只能表現一個線程的狀態。

通過thread.start()啟動線程(僅僅只是通知線程啟動)

thread.join()用於控制線程必須執行完成,調整優先級並不能保證優先級高的線程先執行。

1、繼承Thread

  • 繼承Thread
  • 覆蓋run()方法
  • 直接調用Thread#start()執行
public class MyThread extends Thread{
    @Override
    public void run() {
        System.out.println("Thread");
    }
}

public class ThreadMain {
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start();
    }
}

2、實現Runnable

  • 實現Runnable
  • 獲取實現Runnable接口的實例,作為參數,創建Thread實例
  • 執行Thread#start()啟動線程
public class RunnableThread implements Runnable{
    @Override
    public void run() {
        System.out.println("Runnable");
    }
}

public class ThreadMain {
    public static void main(String[] args) {
        Thread thread = new Thread(new RunnableThread());
        thread.start();
    }
}

3、CallableFuture創建線程

Callable相對於Runnable會有一個線程返回值

  • 實現Callable接口
  • Callable的實現類為參數,創建FutureTask實例
  • FutureTask作為Thread的參數,創建Thread實例
  • 通過Thread#start啟動線程
  • 通過FutureTask#get()阻塞獲取線程的返回值
public class CallableThread implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("Callable");
        return 90;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new CallableThread());
        Thread thread = new Thread(futureTask, "Thread1");
        thread.start();
        System.out.println(futureTask.get());
    }
}

4、使用線程池創建

public class MultiThreadStudy {
    public static void main(String[] args) {
        int corePoolSize = 5;
        int maximumPoolSize = 10;
        long keepAliveTime = 2;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue <Runnable>(3);
        ThreadPoolExecutor threadPoolExecutor = null;

        /**
         * ThreadPoolExecutor 線程池,幾個參數的含義如下:
         * int corePoolSize,    核心線程數大小
         * int maximumPoolSize, 最大線程數
         * long keepAliveTime,  存活時間
         * TimeUnit unit,       keepAliveTime的時間單位
         * BlockingQueue<Runnable> workQueue,  存放待執行任務的隊列
         * RejectedExecutionHandler handler    拒絕策略
         */
        try {
            threadPoolExecutor =  new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.AbortPolicy());
            //循環提交任務
            for (int i = 0; i < 13; i++) {
                //提交任務的索引
                final int index = (i + 1);
                threadPoolExecutor.submit(() -> {
                    //線程打印輸出
                    System.out.println("大家好,我是線程:" + index);
                    try {
                        //模擬線程執行時間,10s
                        Thread.sleep(7000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                //每個任務提交后休眠500ms再提交下一個任務,用於保證提交順序
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
}

5、通過Java創建進程

public class ProcessCreationQuestion {

    public static void main(String[] args) throws IOException {

        // 獲取 Java Runtime
        Runtime runtime = Runtime.getRuntime();
        Process process = runtime.exec("cmd /k start http://www.baidu.com");
        process.exitValue();
    }

}

二、停止線程

無法真正停止一個線程,真正停止的只能是線程的邏輯。

請說明Thread interrupt()、isInterrupted()、interrupted()的區別以及意義?

Thread interrupt(): 設置狀態,調JVM的本地(native)interrupt0()方法。

    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();  // Just to set the interrupt flag
                              //--> private native void interrupt0();
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }

isInterrupted() 調的是靜態方法isInterrupted(),當且僅當狀態設置為中斷時,返回false,並不清除狀態。

public static boolean interrupted() {
    return currentThread().isInterrupted(true);
}

public boolean isInterrupted() {
    return isInterrupted(false);
}

interrupted(): 私有本地方法,即判斷中斷狀態,又清除狀態。

 private native boolean isInterrupted(boolean ClearInterrupted);

三、線程異常

當線程遇到異常的時候,線程會掛

public class ThreadExceptionQuestion {

    public static void main(String[] args) throws InterruptedException {
        //...
        // main 線程 -> 子線程
        Thread t1 = new Thread(() -> {
            throw new RuntimeException("數據達到閾值");
        }, "t1");

        t1.start();
        // main 線程會中止嗎?
        t1.join();

        // Java Thread 是一個包裝,它由 GC 做垃圾回收
        // JVM Thread 可能是一個 OS Thread,JVM 管理,
        // 當線程執行完畢(正常或者異常)
        System.out.println(t1.isAlive());
    }
}

當線程遇到異常時,如何捕獲?

...
        Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
            System.out.printf("線程[%s] 遇到了異常,詳細信息:%s\n",
                    thread.getName(),
                    throwable.getMessage());
        });
...

當線程遇到異常時,ThreadPoolExecutor如何捕獲異常?

public class ThreadPoolExecutorExceptionQuestion {

    public static void main(String[] args) throws InterruptedException {

//        ExecutorService executorService = Executors.newFixedThreadPool(2);

        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                1,
                1,
                0,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>()
        ) {

            /**
             * 通過覆蓋 {@link ThreadPoolExecutor#afterExecute(Runnable, Throwable)} 達到獲取異常的信息
             * @param r
             * @param t
             */
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.printf("線程[%s] 遇到了異常,詳細信息:%s\n",
                        Thread.currentThread().getName(),
                        t.getMessage());
            }

        };

        executorService.execute(() -> {
            throw new RuntimeException("數據達到閾值");
        });

        // 等待一秒鍾,確保提交的任務完成
        executorService.awaitTermination(1, TimeUnit.SECONDS);

        // 關閉線程池
        executorService.shutdown();
    }
}

四、線程狀態

新建 -> 就緒 -> 阻塞 -> 運行 -> 死亡

獲取當前jvm所有的現場狀態

  • 可以直接使用jstack命令
  • ThreadMXBean
public class AllThreadStackQuestion {

    public static void main(String[] args) {
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        long[] threadIds = threadMXBean.getAllThreadIds();

        for (long threadId : threadIds) {
            ThreadInfo threadInfo = threadMXBean.getThreadInfo(threadId);
            System.out.println(threadInfo.toString());
        }

    }
}

獲取線程的資源消費情況

public class AllThreadInfoQuestion {

    public static void main(String[] args) {
        ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean();
        long[] threadIds = threadMXBean.getAllThreadIds();

        for (long threadId : threadIds) {
//            ThreadInfo threadInfo = threadMXBean.getThreadInfo(threadId);
//            System.out.println(threadInfo.toString());
            long bytes = threadMXBean.getThreadAllocatedBytes(threadId);
            long kBytes = bytes / 1024;
            System.out.printf("線程[ID:%d] 分配內存: %s KB\n", threadId, kBytes);
        }

    }
}

五、線程同步

synchronized

synchronized關鍵字在修飾方法與代碼塊中的區別:字節碼的區別,代碼塊用monitor,方法用synchronized關鍵字

synchronized關鍵字與ReentrantLock之間的區別:

  • 兩者都是可重入鎖
  • synchronized依賴於JVM,而ReentrantLock依賴於API
  • ReentrantLocksynchronized增加了一些高級功能
    • 等待可中斷
    • 可實現公平鎖
    • 可實現選擇性通知(鎖可以綁定多個條件)

偏向鎖只對synchronized有用,而ReentrantLock已經實現了偏向鎖.

六、線程通訊

wait()、notify()、notifyAll()方法是Object的本地final方法,無法被重寫

wait():獲得鎖的對象,釋放鎖,當前線程又被阻塞,等同於Java 5 LockSupport中的park方法。

notify():已經獲得鎖,喚起一個被阻塞的線程,等同於Java 5 LockSupport中的unpark方法

notifyAll():會讓所有處於等待池的線程全部進入鎖池去競爭獲取鎖的機會

七、線程退出

當主線程退出時,守護線程不一定執行完畢

public class DaemonThreadQuestion {

    public static void main(String[] args) {
        // main 線程
        Thread t1 = new Thread(() -> {
            System.out.println("Hello,World");
//            Thread currentThread = Thread.currentThread();
//            System.out.printf("線程[name : %s, daemon:%s]: Hello,World\n",
//                    currentThread.getName(),
//                    currentThread.isDaemon()
//            );
        }, "daemon");
        // 編程守候線程
        t1.setDaemon(true);
        t1.start();

        // 守候線程的執行依賴於執行時間(非唯一評判)
    }
}

ShutdownHook線程的使用場景,以及觸發執行

使用場景:SpringAbstractApplicationContextregisterShutdownHook()

public class ShutdownHookQuestion {

    public static void main(String[] args) {

        Runtime runtime = Runtime.getRuntime();

        runtime.addShutdownHook(new Thread(ShutdownHookQuestion::action, "Shutdown Hook Question"));

    }

    private static void action() {
        System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName());  // 2
    }
}

如何確保主線程退出前,所有線程執行完畢?

public class CompleteAllThreadsQuestion {

    public static void main(String[] args) throws InterruptedException {

        // main 線程 -> 子線程
        Thread t1 = new Thread(CompleteAllThreadsQuestion::action, "t1");
        Thread t2 = new Thread(CompleteAllThreadsQuestion::action, "t2");
        Thread t3 = new Thread(CompleteAllThreadsQuestion::action, "t3");

        // 不確定 t1、t2、t3 是否調用 start()

        t1.start();
        t2.start();
        t3.start();

        // 創建了 N Thread

        Thread mainThread = Thread.currentThread();
        // 獲取 main 線程組
        ThreadGroup threadGroup = mainThread.getThreadGroup();
        // 活躍的線程數
        int count = threadGroup.activeCount();
        Thread[] threads = new Thread[count];
        // 把所有的線程復制 threads 數組
        threadGroup.enumerate(threads, true);

        for (Thread thread : threads) {
            System.out.printf("當前活躍線程: %s\n", thread.getName());
        }
    }

    private static void action() {
        System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName());  // 2
    }

}

J.U.C並發集合框架

public class ThreadSafeCollectionQuestion {

    public static void main(String[] args) {

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

        Set<Integer> set = Set.of(1, 2, 3, 4, 5);

        Map<Integer, String> map = Map.of(1, "A");

        // 以上實現都是不變對象,不過第一個除外

        // 通過 Collections#sychronized* 方法返回

        // Wrapper 設計模式(所有的方法都被 synchronized 同步或互斥)
        list = Collections.synchronizedList(list);

        set = Collections.synchronizedSet(set);

        map = Collections.synchronizedMap(map);
        
        list = new CopyOnWriteArrayList<>(list);
        set = new CopyOnWriteArraySet<>(set);
        map = new ConcurrentHashMap<>(map);

    }
}

一、List部分

Vector:底層方法使用synchronized進行同步操作,是List的實現。

CopyOnWriteArrayList:讀的時候不加鎖,弱一致性,添加數據的時候使用的是Arrays.copy()方法進行數據拷貝,和synchronized來保證線程的安全性。

SynchronizedList:底層方法使用synchronized進行同步操作,返回list,實現原理方式是Wrapper實現

二、Set部分

SynchronizedSet:底層方法使用synchronized進行同步操作

CopyOnWriteArraySet:底層借助CopyOnWriteArrayList數組實現。

ConcurrentSkipListSet:

當 Set#iterator() 方法返回 Iterator 對象后,能否在其迭代中,給 Set 對象添加新的元素?

不一定;Set 在傳統實現中,會有fail-fast問題;而在J.U.C中會出現弱一致性,對數據的一致性要求較低,是可以給 Set 對象添加新的元素。

三、Map部分

Hashtable:底層方法使用synchronized進行同步操作,同步粒度較大。

ConcurrentHashMap:8以后底層使用CASsynchronized進行同步操作,數據結構為數組 + 鏈表 + 紅黑樹

ConcurrentSkipListMap

SynchronizedMap:底層方法使用synchronized進行同步操作

請說明 ConcurrentHashMap 與 ConcurrentSkipListMap 各自的優勢與不足?

在 java 6 和 8 中,ConcurrentHashMap 寫的時候,是加鎖的,所以內存占得比較小,而 ConcurrentSkipListMap 寫的時候是不加鎖的,內存占得相對比較大,通過空間換取時間上的成本,速度較快,但比前者要慢,ConcurrentHashMap 基本上是常量時間。ConcurrentSkipListMap 讀和寫都是\(log N\)實現,高性能相對穩定。

四、Queue部分

請說明 BlockingQueue 與 Queue 的區別?

BlockingQueue 繼承了 Queue 的實現;put 方法中有個阻塞的操作(InterruptedException),當隊列滿的時候,put 會被阻塞;當隊列空的時候,put方法可用。take 方法中,當數據存在時,才可以返回,否則為空。

請說明 LinkedBlockingQueue 與 ArrayBlockingQueue 的區別?

LinkedBlockingQueue 是鏈表結構;有兩個構造器,一個是(Integer.MAX_VALUE),無邊界,另一個是(int capacity),有邊界;ArrayBlockingQueue 是數組結構;有邊界。

請說明 LinkedTransferQueue 與 LinkedBlockingQueue 的區別?

LinkedTransferQueue 是java 7中提供的新接口,性能比后者更優化。

五、PriorityBlockingQueue

public class PriorityBlockingQueueQuiz{
    public static void main(String[] args) throw Exception {
        BlockingQueue<Integer> queue = new PriorityBlockingQueue<>(2);
        // 1. PriorityBlockingQueue put(Object) 方法不阻塞,不拋異常
        // 2. PriorityBlockingQueue offer(Object) 方法不限制,允許長度變長
        // 3. PriorityBlockingQueue 插入對象會做排序,默認參照元素 Comparable 實現,
        //    或者顯示地傳遞 Comparator
        queue.put(9);
        queue.put(1);
        queue.put(8);
        System.out.println("queue.size() =" + queue.size());
        System.out.println("queue.take() =" + queue.take());
        System.out.println("queue =" + queue);
    }
}

/**
queue.size() =3
queue.take() =1
queue =[8, 9]
*/

六、SynchronusQueue

public class SynchronusQueueQuiz{
    
    public static void main(String[] args) throws Exception {
        BlockingQueue<Integer> queue = new SynchronousQueue<>();
        // 1. SynchronousQueue 是無空間,offer 永遠返回 false
        // 2. SynchronousQueue take() 方法會被阻塞,必須被其他線程顯示地調用 put(Object);
        System.out.pringln("queue.offer(1) = " + queue.offer(1));
        System.out.pringln("queue.offer(2) = " + queue.offer(2));
        System.out.pringln("queue.offer(3) = " + queue.offer(3));
        System.out.println("queue.take() = " + queue.take());
        System.out.println("queue.size = " + queue.size());
    }
}

七、BlockingQueue offer()

請評估以下程序的運行結果?

public class BlockingQueueQuiz{
    public static void main(String[] args) throws Exception {
        offer(new ArrayBlockingQueue<>(2));
        offer(new LinkedBlockingQueue<>(2));
        offer(new PriorityBlockingQueue<>(2));
        offer(new SynchronousQueue<>());
    }
}

private static void offer(BlockingQueue<Integer> queue) throws Exception {
    System.out.println("queue.getClass() = " +queue.getClass().getName());
    System.out.println("queue.offer(1) = " + queue.offer(1));
    System.out.println("queue.offer(2) = " + queue.offer(2));
    System.out.println("queue.offer(3) = " + queue.offer(3));
    System.out.println("queue.size() = " + queue.size());
    System.out.println("queue.take() = " + queue.take());
    }
}

Java並發框架

一、鎖Lock

鎖主要提供了兩種特性:互斥性和不可見性

請說明ReentranLockReentrantReadWriteLock的區別?

jdk 1.5以后,ReentranLock(重進入鎖)與ReentrantReadWriteLock都是可重進入的鎖,ReentranLock 都是互斥的,而ReentrantReadWriteLock是共享的,其中里面有兩個類,一個是 ReadLock(共享,並行,強調數據一致性或者說可見性),另一個是 WriteLock(互斥,串行)。

請解釋ReentrantLock為什么命名為重進入?

public class ReentrantLockQuestion {

    /**
     * T1 , T2 , T3
     *
     * T1(lock) , T2(park), T3(park)
     * Waited Queue -> Head-> T2 next -> T3
     * T1(unlock) -> unpark all
     * Waited Queue -> Head-> T2 next -> T3
     * T2(free), T3(free)
     *
     * -> T2(lock) , T3(park)
     * Waited Queue -> Head-> T3
     * T2(unlock) -> unpark all
     * T3(free)
     */
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        // thread[main] ->
        // lock     lock           lock
        // main -> action1() -> action2() -> action3()
        synchronizedAction(ReentrantLockQuestion::action1);
    }


    private static void action1() {
        synchronizedAction(ReentrantLockQuestion::action2);
    }

    private static void action2() {
        synchronizedAction(ReentrantLockQuestion::action3);
    }

    private static void action3() {
        System.out.println("Hello,World");
    }

    private static void synchronizedAction(Runnable runnable) {
        lock.lock();
        try {
            runnable.run();
        } finally {
            lock.unlock();
        }
    }
}

請說明 Lock#lock() 與 Lock#lockInterruptibly() 的區別?

    /**
     * java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued
     * 如果當前線程已被其他線程調用了 interrupt() 方法時,這時會返回 true
     * acquireQueued 執行完時,interrupt 清空(false)
     * 再通過 selfInterrupt() 方法將狀態恢復(interrupt=true)
     */
     public static void main(String[] args) {
         lockVsLockInterruptibly();
     }
     
        private static void lockVsLockInterruptibly() {

        try {
            lock.lockInterruptibly();
            action1();
        } catch (InterruptedException e) {
            // 顯示地恢復中斷狀態
            Thread.currentThread().interrupt();
            // 當前線程並未消亡,線程池可能還在存活
        } finally {
            lock.unlock();
        }
    }

lock() 優先考慮獲取鎖,待獲取鎖成功后,才響應中斷。

**lockInterruptibly() ** 優先考慮響應中斷,而不是響應鎖的普通獲取或重入獲取。

ReentrantLock.lockInterruptibly 允許在等待時由其它線程調用等待線程的 Thread.interrupt 方法來中斷等待線程的等待而直接返回,這時不用獲取鎖,而會拋出一個 InterruptedException。

ReentrantLock.lock 方法不允許 Thread.interrupt 中斷,即使檢測到 Thread.isInterrupted ,一樣會繼續嘗試獲取鎖,失敗則繼續休眠。只是在最后獲取鎖成功后再把當前線程置為 interrupted 狀態,然后再中斷線程。

二、條件變量

Condition使用場景

  1. CountDownLatch(condition變種)
  2. CyclicBarrier(循環屏障)
  3. 信號量/燈(Semaphore)java9
  4. 生產者和消費者
  5. 阻塞隊列

請解釋 Condition await() 和 signal() 與 Object wait () 和 notify() 的相同與差異?

相同:阻塞和釋放

差異:Java Thread 對象和實際 JVM 執行的 OS Thread 不是相同對象,JVM Thread 回調 Java Thread.run() 方法,同時 Thread 提供一些 native 方法來獲取 JVM Thread 狀態,當JVM thread 執行后,自動 notify()了。

三、屏障Barriers

請說明 CountDownLatch 與 CyclicBarrier 的區別?

CountDownLatch : 不可循環的,一次性操作(倒計時)。

public class CountDownLatchQuestion {

    public static void main(String[] args) throws InterruptedException {

        // 倒數計數 5
        CountDownLatch latch = new CountDownLatch(5);

        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 4; i++) {
            executorService.submit(() -> {
                action();
                latch.countDown(); // -1
            });
        }

        // 等待完成
        // 當計數 > 0,會被阻塞
        latch.await();

        System.out.println("Done");

        // 關閉線程池
        executorService.shutdown();
    }

    private static void action() {
        System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName());  // 2
    }

}

CyclicBarrier:可循環的, 先計數 -1,再判斷當計數 > 0 時候,才阻塞。

public class CyclicBarrierQuestion {

    public static void main(String[] args) throws InterruptedException {

        CyclicBarrier barrier = new CyclicBarrier(5); // 5

        ExecutorService executorService = Executors.newFixedThreadPool(5); // 3

        for (int i = 0; i < 20; i++) {
            executorService.submit(() -> {
                action();
                try {
                    // CyclicBarrier.await() = CountDownLatch.countDown() + await()
                    // 先計數 -1,再判斷當計數 > 0 時候,才阻塞
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }

        // 盡可能不要執行完成再 reset
        // 先等待 3 ms
        executorService.awaitTermination(3, TimeUnit.MILLISECONDS);
        // 再執行 CyclicBarrier reset
        // reset 方法是一個廢操作
        barrier.reset();

        System.out.println("Done");

        // 關閉線程池
        executorService.shutdown();
    }

    private static void action() {
        System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName());  // 2
    }

}

請說明 Semaphore(信號量/燈) 的使用場景?

Semaphore 和Lock類似,比Lock靈活。其中有 acquire() 和 release() 兩種方法,arg 都等於 1。acquire() 會拋出 InterruptedException,同時從 sync.acquireSharedInterruptibly(arg:1)可以看出是讀模式(shared); release()中可以計數,可以控制數量,permits可以傳遞N個數量。

public class SemaphoreTest {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 5; i++) {
            new SecurityCheckThread(i, semaphore).start();
        }
    }

    private static class SecurityCheckThread extends Thread{
        private int seq;
        private Semaphore semaphore;

        public SecurityCheckThread(int seq, Semaphore semaphore){
            this.seq = seq;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("No." + seq + "乘客,正在查驗中....");

                if (seq%2 == 0){
                    Thread.sleep(1000);
                    System.out.println("No." + seq + "乘客,身份可疑,不能出國!");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
                System.out.println("No." + seq + "乘客已完成服務。");
            }
        }
    }
}

CountDownLatch實現

public class LegacyCountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {

        // 倒數計數 5
        MyCountDownLatch latch = new MyCountDownLatch(5);

        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 5; i++) {
            executorService.submit(() -> {
                action();
                latch.countDown(); // -1
            });
        }

        // 等待完成
        // 當計數 > 0,會被阻塞
        latch.await();

        System.out.println("Done");

        // 關閉線程池
        executorService.shutdown();
    }

    private static void action() {
        System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName());  // 2
    }

    /**
     * Java 1.5+ Lock 實現
     */
    private static class MyCountDownLatch {

        private int count;

        private final Lock lock = new ReentrantLock();

        private final Condition condition = lock.newCondition();

        private MyCountDownLatch(int count) {
            this.count = count;
        }

        public void await() throws InterruptedException {
            // 當 count > 0 等待
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }

            lock.lock();
            try {
                while (count > 0) {
                    condition.await(); // 阻塞當前線程
                }
            } finally {
                lock.unlock();
            }
        }

        public void countDown() {

            lock.lock();
            try {
                if (count < 1) {
                    return;
                }
                count--;
                if (count < 1) { // 當數量減少至0時,喚起被阻塞的線程
                    condition.signalAll();
                }
            } finally {
                lock.unlock();
            }
        }
    }

    /**
     * Java < 1.5 實現
     */
    private static class LegacyCountDownLatch {

        private int count;

        private LegacyCountDownLatch(int count) {
            this.count = count;
        }

        public void await() throws InterruptedException {
            // 當 count > 0 等待
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }

            synchronized (this) {
                while (count > 0) {
                    wait(); // 阻塞當前線程
                }
            }
        }

        public void countDown() {
            synchronized (this) {
                if (count < 1) {
                    return;
                }
                count--;
                if (count < 1) { // 當數量減少至0時,喚起被阻塞的線程
                    notifyAll();
                }
            }
        }
    }
}

四、線程池

請問J.U.C中內建了幾種ExceptionService實現?

1.5:ThreadPoolExecutor、ScheduledThreadPoolExecutor

1.7:ForkJoinPool

public class ExecutorServiceQuestion {

    public static void main(String[] args) {
        /**
         * 1.5
         *  ThreadPoolExecutor
         *  ScheduledThreadPoolExecutor :: ThreadPoolExecutor
         * 1.7
         *  ForkJoinPool
         */
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService = Executors.newScheduledThreadPool(2);

        // executorService 不再被引用,它會被 GC -> finalize() -> shutdown()
        ExecutorService executorService2 = Executors.newSingleThreadExecutor();
    }
}

如何獲取 ThreadPoolExecutor 正在運行的線程?

public class ThreadPoolExecutorThreadQuestion {

    public static void main(String[] args) throws InterruptedException {

        // main 線程啟動子線程,子線程的創造來自於 Executors.defaultThreadFactory()

        ExecutorService executorService = Executors.newCachedThreadPool();
        // 之前了解 ThreadPoolExecutor beforeExecute 和 afterExecute 能夠獲取當前線程數量

        Set<Thread> threadsContainer = new HashSet<>();

        setThreadFactory(executorService, threadsContainer);
        for (int i = 0; i < 9; i++) { // 開啟 9 個線程
            executorService.submit(() -> {
            });
        }

        // 線程池等待執行 3 ms
        executorService.awaitTermination(3, TimeUnit.MILLISECONDS);

        threadsContainer.stream()
                .filter(Thread::isAlive)
                .forEach(thread -> {
                    System.out.println("線程池創造的線程 : " + thread);
                });

        Thread mainThread = Thread.currentThread();

        ThreadGroup mainThreadGroup = mainThread.getThreadGroup();

        int count = mainThreadGroup.activeCount();
        Thread[] threads = new Thread[count];
        mainThreadGroup.enumerate(threads, true);

        Stream.of(threads)
                .filter(Thread::isAlive)
                .forEach(thread -> {
                    System.out.println("線程 : " + thread);
                });

        // 關閉線程池
        executorService.shutdown();
    }

    private static void setThreadFactory(ExecutorService executorService, Set<Thread> threadsContainer) {

        if (executorService instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
            ThreadFactory oldThreadFactory = threadPoolExecutor.getThreadFactory();
            threadPoolExecutor.setThreadFactory(new DelegatingThreadFactory(oldThreadFactory, threadsContainer));
        }
    }

    private static class DelegatingThreadFactory implements ThreadFactory {

        private final ThreadFactory delegate;

        private final Set<Thread> threadsContainer;

        private DelegatingThreadFactory(ThreadFactory delegate, Set<Thread> threadsContainer) {
            this.delegate = delegate;
            this.threadsContainer = threadsContainer;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = delegate.newThread(r);
            // cache thread
            threadsContainer.add(thread);
            return thread;
        }
    }
}

五、Future

如何獲取 Future 對象?

submit()

請舉例 Future get() 以及 get(Long,TimeUnit) 方法的使用場景?

  • 超時等待
  • InterruptedException
  • ExcutionException
  • TimeOutException

如何利用 Future 優雅地取消一個任務的執行?

public class CancellableFutureQuestion {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Future future = executorService.submit(() -> { // 3秒內執行完成,才算正常
            action(5);
        });

        try {
            future.get(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            // Thread 恢復中斷狀態
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            // 執行超時,適當地關閉
            Thread.currentThread().interrupt(); // 設置中斷狀態
            future.cancel(true); // 嘗試取消
        }

        executorService.shutdown();
    }

    private static void action(int seconds) {
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); // 5 - 3
            // seconds - timeout = 剩余執行時間
            if (Thread.interrupted()) { // 判斷並且清除中斷狀態
                return;
            }
            action();
        } catch (InterruptedException e) {
        }
    }

    private static void action() {
        System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName());  // 2
    }
}

六、Volatile

在 Java 中,volatile 保證的是可見性還是原子性?

volatile 既有可見性又有原子性(非我及彼),可見性是一定的,原子性是看情況的。對象類型和原生類型都是可見性,原生類型是原子性。

在 Java 中,volatile long 和 double 是線程安全的嗎?

volatile long 和 double 是線程安全的。

在 Java 中,volatile 底層實現是基於什么機制?

內存屏障(變量 Lock)機制:一個變量的原子性的保證。

七、原子操作Atomic

為什么 AtomicBoolean 內部變量使用 int 實現,而非 boolean?

操作系統有 X86 和 X64,在虛擬機中,基於boolean 實現就是用 int 實現的,用哪一種實現都可以。虛擬機只有32位和64位的,所以用32位的實現。

在變量原子操作時,Atomic CAS 操作比 synchronized 關鍵字哪個更重?*

Synchronization

同線程的時候,synchronized 更快;而多線程的時候則要分情況討論。

public class AtomicQuestion {

    private static int actualValue = 3;

    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(3);
        // if( value == 3 )
        //     value = 5
        atomicInteger.compareAndSet(3, 5);
        // 偏向鎖 < CAS 操作 < 重鎖(完全互斥)
        // CAS 操作也是相對重的操作,它也是實現 synchronized 瘦鎖(thin lock)的關鍵
        // 偏向鎖就是避免 CAS(Compare And Set/Swap)操作
    }

    private synchronized static void compareAndSet(int expectedValue, int newValue) {
        if (actualValue == expectedValue) {
            actualValue = newValue;
        }
    }
}

Atomic CAS 的底層是如何實現?*

匯編指令:cpmxchg (Compare and Exchange)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM