多線程基礎
一、創建線程
僅僅只有new Thread
這種方法創建線程
Java中無法銷毀一個線程,只能表現一個線程的狀態。
通過thread.start()
啟動線程(僅僅只是通知線程啟動)
thread.join()
用於控制線程必須執行完成,調整優先級並不能保證優先級高的線程先執行。
1、繼承Thread
- 繼承
Thread
- 覆蓋
run()
方法 - 直接調用
Thread#start()
執行
public class MyThread extends Thread{
@Override
public void run() {
System.out.println("Thread");
}
}
public class ThreadMain {
public static void main(String[] args) {
MyThread myThread = new MyThread();
myThread.start();
}
}
2、實現Runnable
- 實現
Runnable
- 獲取實現
Runnable
接口的實例,作為參數,創建Thread
實例 - 執行
Thread#start()
啟動線程
public class RunnableThread implements Runnable{
@Override
public void run() {
System.out.println("Runnable");
}
}
public class ThreadMain {
public static void main(String[] args) {
Thread thread = new Thread(new RunnableThread());
thread.start();
}
}
3、Callable
和Future
創建線程
Callable
相對於Runnable
會有一個線程返回值
- 實現
Callable
接口 - 以
Callable
的實現類為參數,創建FutureTask
實例 - 將
FutureTask
作為Thread
的參數,創建Thread
實例 - 通過
Thread#start
啟動線程 - 通過
FutureTask#get()
阻塞獲取線程的返回值
public class CallableThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("Callable");
return 90;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(new CallableThread());
Thread thread = new Thread(futureTask, "Thread1");
thread.start();
System.out.println(futureTask.get());
}
}
4、使用線程池創建
public class MultiThreadStudy {
public static void main(String[] args) {
int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 2;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue <Runnable>(3);
ThreadPoolExecutor threadPoolExecutor = null;
/**
* ThreadPoolExecutor 線程池,幾個參數的含義如下:
* int corePoolSize, 核心線程數大小
* int maximumPoolSize, 最大線程數
* long keepAliveTime, 存活時間
* TimeUnit unit, keepAliveTime的時間單位
* BlockingQueue<Runnable> workQueue, 存放待執行任務的隊列
* RejectedExecutionHandler handler 拒絕策略
*/
try {
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
new ThreadPoolExecutor.AbortPolicy());
//循環提交任務
for (int i = 0; i < 13; i++) {
//提交任務的索引
final int index = (i + 1);
threadPoolExecutor.submit(() -> {
//線程打印輸出
System.out.println("大家好,我是線程:" + index);
try {
//模擬線程執行時間,10s
Thread.sleep(7000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//每個任務提交后休眠500ms再提交下一個任務,用於保證提交順序
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
5、通過Java
創建進程
public class ProcessCreationQuestion {
public static void main(String[] args) throws IOException {
// 獲取 Java Runtime
Runtime runtime = Runtime.getRuntime();
Process process = runtime.exec("cmd /k start http://www.baidu.com");
process.exitValue();
}
}
二、停止線程
無法真正停止一個線程,真正停止的只能是線程的邏輯。
請說明Thread interrupt()、isInterrupted()、interrupted()
的區別以及意義?
Thread interrupt(): 設置狀態,調JVM的本地(native)interrupt0()
方法。
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
//--> private native void interrupt0();
b.interrupt(this);
return;
}
}
interrupt0();
}
isInterrupted()
: 調的是靜態方法isInterrupted()
,當且僅當狀態設置為中斷時,返回false
,並不清除狀態。
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
public boolean isInterrupted() {
return isInterrupted(false);
}
interrupted(): 私有本地方法,即判斷中斷狀態,又清除狀態。
private native boolean isInterrupted(boolean ClearInterrupted);
三、線程異常
當線程遇到異常的時候,線程會掛
public class ThreadExceptionQuestion {
public static void main(String[] args) throws InterruptedException {
//...
// main 線程 -> 子線程
Thread t1 = new Thread(() -> {
throw new RuntimeException("數據達到閾值");
}, "t1");
t1.start();
// main 線程會中止嗎?
t1.join();
// Java Thread 是一個包裝,它由 GC 做垃圾回收
// JVM Thread 可能是一個 OS Thread,JVM 管理,
// 當線程執行完畢(正常或者異常)
System.out.println(t1.isAlive());
}
}
當線程遇到異常時,如何捕獲?
...
Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
System.out.printf("線程[%s] 遇到了異常,詳細信息:%s\n",
thread.getName(),
throwable.getMessage());
});
...
當線程遇到異常時,ThreadPoolExecutor
如何捕獲異常?
public class ThreadPoolExecutorExceptionQuestion {
public static void main(String[] args) throws InterruptedException {
// ExecutorService executorService = Executors.newFixedThreadPool(2);
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
1,
1,
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()
) {
/**
* 通過覆蓋 {@link ThreadPoolExecutor#afterExecute(Runnable, Throwable)} 達到獲取異常的信息
* @param r
* @param t
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.printf("線程[%s] 遇到了異常,詳細信息:%s\n",
Thread.currentThread().getName(),
t.getMessage());
}
};
executorService.execute(() -> {
throw new RuntimeException("數據達到閾值");
});
// 等待一秒鍾,確保提交的任務完成
executorService.awaitTermination(1, TimeUnit.SECONDS);
// 關閉線程池
executorService.shutdown();
}
}
四、線程狀態
新建 -> 就緒 -> 阻塞 -> 運行 -> 死亡
獲取當前jvm
所有的現場狀態
- 可以直接使用
jstack
命令 ThreadMXBean
public class AllThreadStackQuestion {
public static void main(String[] args) {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
long[] threadIds = threadMXBean.getAllThreadIds();
for (long threadId : threadIds) {
ThreadInfo threadInfo = threadMXBean.getThreadInfo(threadId);
System.out.println(threadInfo.toString());
}
}
}
獲取線程的資源消費情況
public class AllThreadInfoQuestion {
public static void main(String[] args) {
ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean();
long[] threadIds = threadMXBean.getAllThreadIds();
for (long threadId : threadIds) {
// ThreadInfo threadInfo = threadMXBean.getThreadInfo(threadId);
// System.out.println(threadInfo.toString());
long bytes = threadMXBean.getThreadAllocatedBytes(threadId);
long kBytes = bytes / 1024;
System.out.printf("線程[ID:%d] 分配內存: %s KB\n", threadId, kBytes);
}
}
}
五、線程同步
synchronized
synchronized
關鍵字在修飾方法與代碼塊中的區別:字節碼的區別,代碼塊用monitor
,方法用synchronized
關鍵字
synchronized
關鍵字與ReentrantLock
之間的區別:
- 兩者都是可重入鎖
synchronized
依賴於JVM
,而ReentrantLock
依賴於API
ReentrantLock
比synchronized
增加了一些高級功能- 等待可中斷
- 可實現公平鎖
- 可實現選擇性通知(鎖可以綁定多個條件)
偏向鎖只對synchronized
有用,而ReentrantLock
已經實現了偏向鎖.
六、線程通訊
wait()、notify()、notifyAll()方法是Object的本地final方法,無法被重寫
wait()
:獲得鎖的對象,釋放鎖,當前線程又被阻塞,等同於Java 5 LockSupport
中的park
方法。
notify()
:已經獲得鎖,喚起一個被阻塞的線程,等同於Java 5 LockSupport
中的unpark
方法
notifyAll()
:會讓所有處於等待池的線程全部進入鎖池去競爭獲取鎖的機會
七、線程退出
當主線程退出時,守護線程不一定執行完畢
public class DaemonThreadQuestion {
public static void main(String[] args) {
// main 線程
Thread t1 = new Thread(() -> {
System.out.println("Hello,World");
// Thread currentThread = Thread.currentThread();
// System.out.printf("線程[name : %s, daemon:%s]: Hello,World\n",
// currentThread.getName(),
// currentThread.isDaemon()
// );
}, "daemon");
// 編程守候線程
t1.setDaemon(true);
t1.start();
// 守候線程的執行依賴於執行時間(非唯一評判)
}
}
ShutdownHook
線程的使用場景,以及觸發執行
使用場景:Spring
中AbstractApplicationContext
的registerShutdownHook()
public class ShutdownHookQuestion {
public static void main(String[] args) {
Runtime runtime = Runtime.getRuntime();
runtime.addShutdownHook(new Thread(ShutdownHookQuestion::action, "Shutdown Hook Question"));
}
private static void action() {
System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName()); // 2
}
}
如何確保主線程退出前,所有線程執行完畢?
public class CompleteAllThreadsQuestion {
public static void main(String[] args) throws InterruptedException {
// main 線程 -> 子線程
Thread t1 = new Thread(CompleteAllThreadsQuestion::action, "t1");
Thread t2 = new Thread(CompleteAllThreadsQuestion::action, "t2");
Thread t3 = new Thread(CompleteAllThreadsQuestion::action, "t3");
// 不確定 t1、t2、t3 是否調用 start()
t1.start();
t2.start();
t3.start();
// 創建了 N Thread
Thread mainThread = Thread.currentThread();
// 獲取 main 線程組
ThreadGroup threadGroup = mainThread.getThreadGroup();
// 活躍的線程數
int count = threadGroup.activeCount();
Thread[] threads = new Thread[count];
// 把所有的線程復制 threads 數組
threadGroup.enumerate(threads, true);
for (Thread thread : threads) {
System.out.printf("當前活躍線程: %s\n", thread.getName());
}
}
private static void action() {
System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName()); // 2
}
}
J.U.C
並發集合框架
public class ThreadSafeCollectionQuestion {
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Set<Integer> set = Set.of(1, 2, 3, 4, 5);
Map<Integer, String> map = Map.of(1, "A");
// 以上實現都是不變對象,不過第一個除外
// 通過 Collections#sychronized* 方法返回
// Wrapper 設計模式(所有的方法都被 synchronized 同步或互斥)
list = Collections.synchronizedList(list);
set = Collections.synchronizedSet(set);
map = Collections.synchronizedMap(map);
list = new CopyOnWriteArrayList<>(list);
set = new CopyOnWriteArraySet<>(set);
map = new ConcurrentHashMap<>(map);
}
}
一、List部分
Vector
:底層方法使用synchronized
進行同步操作,是List
的實現。
CopyOnWriteArrayList
:讀的時候不加鎖,弱一致性,添加數據的時候使用的是Arrays.copy()
方法進行數據拷貝,和synchronized
來保證線程的安全性。
SynchronizedList
:底層方法使用synchronized
進行同步操作,返回list
,實現原理方式是Wrapper
實現
二、Set部分
SynchronizedSet
:底層方法使用synchronized
進行同步操作
CopyOnWriteArraySet
:底層借助CopyOnWriteArrayList
數組實現。
ConcurrentSkipListSet
:
當 Set#iterator() 方法返回 Iterator 對象后,能否在其迭代中,給 Set 對象添加新的元素?
不一定;Set 在傳統實現中,會有fail-fast
問題;而在J.U.C
中會出現弱一致性,對數據的一致性要求較低,是可以給 Set 對象添加新的元素。
三、Map部分
Hashtable
:底層方法使用synchronized
進行同步操作,同步粒度較大。
ConcurrentHashMap
:8以后底層使用CAS
和synchronized
進行同步操作,數據結構為數組 + 鏈表 + 紅黑樹
ConcurrentSkipListMap
:
SynchronizedMap
:底層方法使用synchronized
進行同步操作
請說明 ConcurrentHashMap 與 ConcurrentSkipListMap 各自的優勢與不足?
在 java 6 和 8 中,ConcurrentHashMap 寫的時候,是加鎖的,所以內存占得比較小,而 ConcurrentSkipListMap 寫的時候是不加鎖的,內存占得相對比較大,通過空間換取時間上的成本,速度較快,但比前者要慢,ConcurrentHashMap 基本上是常量時間。ConcurrentSkipListMap 讀和寫都是\(log N\)實現,高性能相對穩定。
四、Queue部分
請說明 BlockingQueue 與 Queue 的區別?
BlockingQueue 繼承了 Queue 的實現;put 方法中有個阻塞的操作(InterruptedException),當隊列滿的時候,put 會被阻塞;當隊列空的時候,put方法可用。take 方法中,當數據存在時,才可以返回,否則為空。
請說明 LinkedBlockingQueue 與 ArrayBlockingQueue 的區別?
LinkedBlockingQueue 是鏈表結構;有兩個構造器,一個是(Integer.MAX_VALUE),無邊界,另一個是(int capacity),有邊界;ArrayBlockingQueue 是數組結構;有邊界。
請說明 LinkedTransferQueue 與 LinkedBlockingQueue 的區別?
LinkedTransferQueue 是java 7中提供的新接口,性能比后者更優化。
五、PriorityBlockingQueue
public class PriorityBlockingQueueQuiz{
public static void main(String[] args) throw Exception {
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>(2);
// 1. PriorityBlockingQueue put(Object) 方法不阻塞,不拋異常
// 2. PriorityBlockingQueue offer(Object) 方法不限制,允許長度變長
// 3. PriorityBlockingQueue 插入對象會做排序,默認參照元素 Comparable 實現,
// 或者顯示地傳遞 Comparator
queue.put(9);
queue.put(1);
queue.put(8);
System.out.println("queue.size() =" + queue.size());
System.out.println("queue.take() =" + queue.take());
System.out.println("queue =" + queue);
}
}
/**
queue.size() =3
queue.take() =1
queue =[8, 9]
*/
六、SynchronusQueue
public class SynchronusQueueQuiz{
public static void main(String[] args) throws Exception {
BlockingQueue<Integer> queue = new SynchronousQueue<>();
// 1. SynchronousQueue 是無空間,offer 永遠返回 false
// 2. SynchronousQueue take() 方法會被阻塞,必須被其他線程顯示地調用 put(Object);
System.out.pringln("queue.offer(1) = " + queue.offer(1));
System.out.pringln("queue.offer(2) = " + queue.offer(2));
System.out.pringln("queue.offer(3) = " + queue.offer(3));
System.out.println("queue.take() = " + queue.take());
System.out.println("queue.size = " + queue.size());
}
}
七、BlockingQueue offer()
請評估以下程序的運行結果?
public class BlockingQueueQuiz{
public static void main(String[] args) throws Exception {
offer(new ArrayBlockingQueue<>(2));
offer(new LinkedBlockingQueue<>(2));
offer(new PriorityBlockingQueue<>(2));
offer(new SynchronousQueue<>());
}
}
private static void offer(BlockingQueue<Integer> queue) throws Exception {
System.out.println("queue.getClass() = " +queue.getClass().getName());
System.out.println("queue.offer(1) = " + queue.offer(1));
System.out.println("queue.offer(2) = " + queue.offer(2));
System.out.println("queue.offer(3) = " + queue.offer(3));
System.out.println("queue.size() = " + queue.size());
System.out.println("queue.take() = " + queue.take());
}
}
Java
並發框架
一、鎖Lock
鎖主要提供了兩種特性:互斥性和不可見性
請說明ReentranLock
與ReentrantReadWriteLock
的區別?
jdk 1.5
以后,ReentranLock
(重進入鎖)與ReentrantReadWriteLock
都是可重進入的鎖,ReentranLock
都是互斥的,而ReentrantReadWriteLock
是共享的,其中里面有兩個類,一個是 ReadLock
(共享,並行,強調數據一致性或者說可見性),另一個是 WriteLock
(互斥,串行)。
請解釋ReentrantLock
為什么命名為重進入?
public class ReentrantLockQuestion {
/**
* T1 , T2 , T3
*
* T1(lock) , T2(park), T3(park)
* Waited Queue -> Head-> T2 next -> T3
* T1(unlock) -> unpark all
* Waited Queue -> Head-> T2 next -> T3
* T2(free), T3(free)
*
* -> T2(lock) , T3(park)
* Waited Queue -> Head-> T3
* T2(unlock) -> unpark all
* T3(free)
*/
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
// thread[main] ->
// lock lock lock
// main -> action1() -> action2() -> action3()
synchronizedAction(ReentrantLockQuestion::action1);
}
private static void action1() {
synchronizedAction(ReentrantLockQuestion::action2);
}
private static void action2() {
synchronizedAction(ReentrantLockQuestion::action3);
}
private static void action3() {
System.out.println("Hello,World");
}
private static void synchronizedAction(Runnable runnable) {
lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}
}
請說明 Lock#lock() 與 Lock#lockInterruptibly() 的區別?
/**
* java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued
* 如果當前線程已被其他線程調用了 interrupt() 方法時,這時會返回 true
* acquireQueued 執行完時,interrupt 清空(false)
* 再通過 selfInterrupt() 方法將狀態恢復(interrupt=true)
*/
public static void main(String[] args) {
lockVsLockInterruptibly();
}
private static void lockVsLockInterruptibly() {
try {
lock.lockInterruptibly();
action1();
} catch (InterruptedException e) {
// 顯示地恢復中斷狀態
Thread.currentThread().interrupt();
// 當前線程並未消亡,線程池可能還在存活
} finally {
lock.unlock();
}
}
lock() 優先考慮獲取鎖,待獲取鎖成功后,才響應中斷。
**lockInterruptibly() ** 優先考慮響應中斷,而不是響應鎖的普通獲取或重入獲取。
ReentrantLock.lockInterruptibly 允許在等待時由其它線程調用等待線程的 Thread.interrupt 方法來中斷等待線程的等待而直接返回,這時不用獲取鎖,而會拋出一個 InterruptedException。
ReentrantLock.lock 方法不允許 Thread.interrupt 中斷,即使檢測到 Thread.isInterrupted ,一樣會繼續嘗試獲取鎖,失敗則繼續休眠。只是在最后獲取鎖成功后再把當前線程置為 interrupted 狀態,然后再中斷線程。
二、條件變量
Condition
使用場景
CountDownLatch(condition變種)
CyclicBarrier
(循環屏障)- 信號量/燈
(Semaphore)java9
- 生產者和消費者
- 阻塞隊列
請解釋 Condition await() 和 signal() 與 Object wait () 和 notify() 的相同與差異?
相同:阻塞和釋放
差異:Java Thread 對象和實際 JVM 執行的 OS Thread 不是相同對象,JVM Thread 回調 Java Thread.run() 方法,同時 Thread 提供一些 native 方法來獲取 JVM Thread 狀態,當JVM thread 執行后,自動 notify()了。
三、屏障Barriers
請說明 CountDownLatch 與 CyclicBarrier 的區別?
CountDownLatch : 不可循環的,一次性操作(倒計時)。
public class CountDownLatchQuestion {
public static void main(String[] args) throws InterruptedException {
// 倒數計數 5
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 4; i++) {
executorService.submit(() -> {
action();
latch.countDown(); // -1
});
}
// 等待完成
// 當計數 > 0,會被阻塞
latch.await();
System.out.println("Done");
// 關閉線程池
executorService.shutdown();
}
private static void action() {
System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName()); // 2
}
}
CyclicBarrier:可循環的, 先計數 -1,再判斷當計數 > 0 時候,才阻塞。
public class CyclicBarrierQuestion {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(5); // 5
ExecutorService executorService = Executors.newFixedThreadPool(5); // 3
for (int i = 0; i < 20; i++) {
executorService.submit(() -> {
action();
try {
// CyclicBarrier.await() = CountDownLatch.countDown() + await()
// 先計數 -1,再判斷當計數 > 0 時候,才阻塞
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
// 盡可能不要執行完成再 reset
// 先等待 3 ms
executorService.awaitTermination(3, TimeUnit.MILLISECONDS);
// 再執行 CyclicBarrier reset
// reset 方法是一個廢操作
barrier.reset();
System.out.println("Done");
// 關閉線程池
executorService.shutdown();
}
private static void action() {
System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName()); // 2
}
}
請說明 Semaphore(信號量/燈) 的使用場景?
Semaphore 和Lock類似,比Lock靈活。其中有 acquire() 和 release() 兩種方法,arg 都等於 1。acquire() 會拋出 InterruptedException,同時從 sync.acquireSharedInterruptibly(arg:1)
可以看出是讀模式(shared); release()中可以計數,可以控制數量,permits可以傳遞N個數量。
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new SecurityCheckThread(i, semaphore).start();
}
}
private static class SecurityCheckThread extends Thread{
private int seq;
private Semaphore semaphore;
public SecurityCheckThread(int seq, Semaphore semaphore){
this.seq = seq;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("No." + seq + "乘客,正在查驗中....");
if (seq%2 == 0){
Thread.sleep(1000);
System.out.println("No." + seq + "乘客,身份可疑,不能出國!");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println("No." + seq + "乘客已完成服務。");
}
}
}
}
CountDownLatch
實現
public class LegacyCountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 倒數計數 5
MyCountDownLatch latch = new MyCountDownLatch(5);
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executorService.submit(() -> {
action();
latch.countDown(); // -1
});
}
// 等待完成
// 當計數 > 0,會被阻塞
latch.await();
System.out.println("Done");
// 關閉線程池
executorService.shutdown();
}
private static void action() {
System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName()); // 2
}
/**
* Java 1.5+ Lock 實現
*/
private static class MyCountDownLatch {
private int count;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private MyCountDownLatch(int count) {
this.count = count;
}
public void await() throws InterruptedException {
// 當 count > 0 等待
if (Thread.interrupted()) {
throw new InterruptedException();
}
lock.lock();
try {
while (count > 0) {
condition.await(); // 阻塞當前線程
}
} finally {
lock.unlock();
}
}
public void countDown() {
lock.lock();
try {
if (count < 1) {
return;
}
count--;
if (count < 1) { // 當數量減少至0時,喚起被阻塞的線程
condition.signalAll();
}
} finally {
lock.unlock();
}
}
}
/**
* Java < 1.5 實現
*/
private static class LegacyCountDownLatch {
private int count;
private LegacyCountDownLatch(int count) {
this.count = count;
}
public void await() throws InterruptedException {
// 當 count > 0 等待
if (Thread.interrupted()) {
throw new InterruptedException();
}
synchronized (this) {
while (count > 0) {
wait(); // 阻塞當前線程
}
}
}
public void countDown() {
synchronized (this) {
if (count < 1) {
return;
}
count--;
if (count < 1) { // 當數量減少至0時,喚起被阻塞的線程
notifyAll();
}
}
}
}
}
四、線程池
請問J.U.C
中內建了幾種ExceptionService
實現?
1.5:ThreadPoolExecutor、ScheduledThreadPoolExecutor
1.7:ForkJoinPool
public class ExecutorServiceQuestion {
public static void main(String[] args) {
/**
* 1.5
* ThreadPoolExecutor
* ScheduledThreadPoolExecutor :: ThreadPoolExecutor
* 1.7
* ForkJoinPool
*/
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService = Executors.newScheduledThreadPool(2);
// executorService 不再被引用,它會被 GC -> finalize() -> shutdown()
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
}
}
如何獲取 ThreadPoolExecutor 正在運行的線程?
public class ThreadPoolExecutorThreadQuestion {
public static void main(String[] args) throws InterruptedException {
// main 線程啟動子線程,子線程的創造來自於 Executors.defaultThreadFactory()
ExecutorService executorService = Executors.newCachedThreadPool();
// 之前了解 ThreadPoolExecutor beforeExecute 和 afterExecute 能夠獲取當前線程數量
Set<Thread> threadsContainer = new HashSet<>();
setThreadFactory(executorService, threadsContainer);
for (int i = 0; i < 9; i++) { // 開啟 9 個線程
executorService.submit(() -> {
});
}
// 線程池等待執行 3 ms
executorService.awaitTermination(3, TimeUnit.MILLISECONDS);
threadsContainer.stream()
.filter(Thread::isAlive)
.forEach(thread -> {
System.out.println("線程池創造的線程 : " + thread);
});
Thread mainThread = Thread.currentThread();
ThreadGroup mainThreadGroup = mainThread.getThreadGroup();
int count = mainThreadGroup.activeCount();
Thread[] threads = new Thread[count];
mainThreadGroup.enumerate(threads, true);
Stream.of(threads)
.filter(Thread::isAlive)
.forEach(thread -> {
System.out.println("線程 : " + thread);
});
// 關閉線程池
executorService.shutdown();
}
private static void setThreadFactory(ExecutorService executorService, Set<Thread> threadsContainer) {
if (executorService instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
ThreadFactory oldThreadFactory = threadPoolExecutor.getThreadFactory();
threadPoolExecutor.setThreadFactory(new DelegatingThreadFactory(oldThreadFactory, threadsContainer));
}
}
private static class DelegatingThreadFactory implements ThreadFactory {
private final ThreadFactory delegate;
private final Set<Thread> threadsContainer;
private DelegatingThreadFactory(ThreadFactory delegate, Set<Thread> threadsContainer) {
this.delegate = delegate;
this.threadsContainer = threadsContainer;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = delegate.newThread(r);
// cache thread
threadsContainer.add(thread);
return thread;
}
}
}
五、Future
如何獲取 Future 對象?
submit()
請舉例 Future get() 以及 get(Long,TimeUnit) 方法的使用場景?
- 超時等待
InterruptedException
ExcutionException
TimeOutException
如何利用 Future 優雅地取消一個任務的執行?
public class CancellableFutureQuestion {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(() -> { // 3秒內執行完成,才算正常
action(5);
});
try {
future.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Thread 恢復中斷狀態
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
// 執行超時,適當地關閉
Thread.currentThread().interrupt(); // 設置中斷狀態
future.cancel(true); // 嘗試取消
}
executorService.shutdown();
}
private static void action(int seconds) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); // 5 - 3
// seconds - timeout = 剩余執行時間
if (Thread.interrupted()) { // 判斷並且清除中斷狀態
return;
}
action();
} catch (InterruptedException e) {
}
}
private static void action() {
System.out.printf("線程[%s] 正在執行...\n", Thread.currentThread().getName()); // 2
}
}
六、Volatile
在 Java 中,volatile 保證的是可見性還是原子性?
volatile 既有可見性又有原子性(非我及彼),可見性是一定的,原子性是看情況的。對象類型和原生類型都是可見性,原生類型是原子性。
在 Java 中,volatile long 和 double 是線程安全的嗎?
volatile long 和 double 是線程安全的。
在 Java 中,volatile 底層實現是基於什么機制?
內存屏障(變量 Lock)機制:一個變量的原子性的保證。
七、原子操作Atomic
為什么 AtomicBoolean 內部變量使用 int 實現,而非 boolean?
操作系統有 X86 和 X64,在虛擬機中,基於boolean 實現就是用 int 實現的,用哪一種實現都可以。虛擬機只有32位和64位的,所以用32位的實現。
在變量原子操作時,Atomic CAS 操作比 synchronized 關鍵字哪個更重?*
同線程的時候,synchronized 更快;而多線程的時候則要分情況討論。
public class AtomicQuestion {
private static int actualValue = 3;
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(3);
// if( value == 3 )
// value = 5
atomicInteger.compareAndSet(3, 5);
// 偏向鎖 < CAS 操作 < 重鎖(完全互斥)
// CAS 操作也是相對重的操作,它也是實現 synchronized 瘦鎖(thin lock)的關鍵
// 偏向鎖就是避免 CAS(Compare And Set/Swap)操作
}
private synchronized static void compareAndSet(int expectedValue, int newValue) {
if (actualValue == expectedValue) {
actualValue = newValue;
}
}
}
Atomic CAS 的底層是如何實現?*
匯編指令:cpmxchg
(Compare and Exchange)