1. JUC 簡介
- 在 Java 5.0 提供了
java.util.concurrent(簡稱JUC)包,在此包中增加了在並發編程中很常用的工具類,
用於定義類似於線程的自定義子系統,包括線程池,異步 IO 和輕量級任務框架;還提供了設計用於多線程上下文中
的 Collection 實現等;
2. volatile 關鍵字
- volatile 關鍵字: 當多個線程進行操作共享數據時,可以保證內存中的數據是可見的;相較於 synchronized 是一種
較為輕量級的同步策略; - volatile 不具備"互斥性";
- volatile 不能保證變量的"原子性";
// 使用 volatile 之前
public class TestVolatile{
public static void main(String[] args){
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while(true){
if(td.isFlag()){
System.out.println("########");
break;
}
}
}
}
class ThreadDemo implements Runnable{
private boolean flag = false;
public void run(){
try{
// 該線程 sleep(200), 導致了程序無法執行成功
Thread.sleep(200);
}catch(InterruptedException e){
e.printStackTrace();
}
flag = true;
Sytem.out.println("flag="+isFlag());
}
public boolean isFlag(){
return flag;
}
public void setFlag(boolean flag){
this.flag = flag;
}
}


// 解決問題方式一: 同步鎖
// 但是,效率太低
public class TestVolatile{
public static void main(String[] args){
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while(true){
// 使用同步鎖
synchronized(td){
if(td.isFlag()){
System.out.println("########");
break;
}
}
}
}
}
// 解決方式二: 使用 volatile 關鍵字
public class TestVolatile{
public static void main(String[] args){
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while(true){
if(td.isFlag()){
System.out.println("########");
break;
}
}
}
}
class ThreadDemo implements Runnable{
private volatile boolean flag = false;
同上(略)
}
3. i++ 的原子性問題
i++的操作實際上分為三個步驟: "讀-改-寫";- 原子性: 就是"i++"的"讀-改-寫"是不可分割的三個步驟;
- 原子變量: JDK1.5 以后,
java.util.concurrent.atomic包下,提供了常用的原子變量;- 原子變量中的值,使用
volatile修飾,保證了內存可見性; - CAS(Compare-And-Swap) 算法保證數據的原子性;
- 原子變量中的值,使用
int i = 10;
i = i++; // 此時, i=10
執行步驟:
int temp = i;
i = i + 1;
i = temp;
// 測試類
public class TestAtomicDemo{
public static void main(String[] args){
AtomicDemo ad = new AtomicDemo();
for(int i=0; i < 10; i++){
new Thread(ad).start();
}
}
}
class AtomicDemo implements Runnable{
private int serialNumber = 0;
public void run(){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName() + ":" + getSerialNumber());
}
public int getSerialNumber(){
return serialNumber++;
}
}

// 改進: 使用原子變量
class AtomicDemo implements Runnable{
private AtomicInteger serialNumber = new AtomicInteger();
public void run(){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName()+":"+getSerialNumber());
}
public int getSerialNumber(){
// 自增運算
return serialNumber.getAndIncrement();
}
}
3.1 CAS 算法
- CAS(Compare-And-Swap) 算法是硬件對於並發的支持,針對多處理器操作而設計的處理器中的一種特殊指令,用於
管理對共享數據的並發訪問; - CAS 是一種無鎖的非阻塞算法的實現;
- CAS 包含了三個操作數:
- 需要讀寫的內存值: V
- 進行比較的預估值: A
- 擬寫入的更新值: B
- 當且僅當 V == A 時, V = B, 否則,將不做任何操作;
// 模擬CAS 算法
class CompareAndSwap{
private int value;
// 獲取內存值
public synchronized int get(){
return value;
}
// 無論更新成功與否,都返回修改之前的內存值
public synchronized int compareAndSwap(int expectedValue, int newValue){
// 獲取舊值
int oldValue = value;
if(oldValue == expectedValue){
this.value = newValue;
}
// 返回修改之前的值
return oldValue;
}
// 判斷是否設置成功
public synchronized boolean compareAndSet(int expectedValue, int newValue){
return expectedValue == compareAndSwap(expectedValue, newValue);
}
}
public class TestCompareAndSwap{
public static void main(String[] args){
final CopareAndSwap cas = new CompareAndSwap();
for(int i=0; i<10; i++){
// 創建10個線程,模擬多線程環境
new Thead(new Runnable(){
public void run(){
int expectedValue = cas.get();
boolean b = cas.compareAndSet(expectedValue, (int)(Math.random()*100));
System.out.println(b);
}
}).start();
}
}
}
4. 並發容器類
- Java 5.0 在
java.util.concurrent包中提供了多種並發容器類來改進同步容器的性能;
4.1 ConcurrentHashMap
- ConcurrentHashMap 同步容器類是 Java5 增加的一個線程安全的哈希表;介於 HashMap 與 Hashtable 之間;
內部采用"鎖分段"機制替代Hashtable的獨占鎖,進而提高性能; - 此包還提供了設計用於多線程上下文中的
Collection實現:ConcurrentHashMap,ConcurrentSkipListMap
ConcurrentSkipListSet,CopyOnWriteArrayList和CopyOnWriteArraySet;- 當期望許多線程訪問一個給定collection時,
ConcurrentHashMap通常優於同步的HashMap;
ConcurrentSkipListMap通常優於同步的TreeMap; - 當期望的讀數和遍歷遠遠大於列表的更新數時,
CopyOnWriteArrayList優於同步的ArrayList;
- 當期望許多線程訪問一個給定collection時,
4.2 CountDownLatch(閉鎖)
CountDownLatch是一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待;
// 測試類: 計算多線程的執行時間
public class TestCountDownLatch{
public static void main(String[] args){
final CountDownLatch latch = new CountDownLatch(10);
LatchDemo ld = new LatchDemo(latch);
long start = System.currentTimeMillis();
// 創建10個線程
for(int i=0; i<10; i++){
new Thread(ld).start();
}
try{
latch.await();
}catch(InterruptedException e){
}
long end = System.currentTimeMillis();
System.out.println("耗費時間為:"+(end - start));
}
}
class LatchDemo implements Runnable{
private CountDownLatch latch;
// 有參構造器
public LatchDemo(CountDownLatch latch){
this.latch = latch;
}
public void run(){
synchronized(this){
try{
// 打印50000以內的偶數
for(int i=0; i<50000; i++){
if(i % 2 == 0){
System.out.println(i);
}
}
}finally{
// 線程數量遞減
latch.countDown();
}
}
}
}
5. 創建執行線程的方式三
- 相較於實現 Runnable 接口的方式,實現 Callable 接口類中的方法可以有返回值,並且可以拋出異常;
// 測試類
public class TestCallable{
public static void main(String[] args){
ThreadDemo td = new ThreadDemo();
// 執行 Callable 方式,需要 FutureTask 實現類的支持
// FutureTask 實現類用於接收運算結果, FutureTask 是 Future 接口的實現類
FutureTask<Integer> result = new FutureTask<>(td);
new Thread(result).start();
// 接收線程運算后的結果
try{
// 只有當 Thread 線程執行完成后,才會打印結果;
// 因此, FutureTask 也可用於閉鎖
Integer sum = result.get();
System.out.println(sum);
}catch(InterruptedException | ExecutionException e){
e.printStackTrace();
}
}
}
class ThreadDemo implements Callable<Integer>{
// 需要實現的方法
public Integer call() throws Exception{
// 計算 0~100 的和
int sum = 0;
for(int i=0; i<=100; i++){
sum += i;
}
return sum;
}
}
6. 同步鎖(Lock)
// 測試類: 以賣票為例
// 使用 lock 之前
public class TestLock{
public static void main(String[] args){
Ticket ticket = new Ticket();
new Thread(ticket,"1號窗口").start();
new Thread(ticket,"2號窗口").start();
new Thread(ticket,"3號窗口").start();
}
}
class Ticket implements Runnable{
private int tick = 100;
public void run(){
while(true){
if(tick > 0){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName()+"完成售票,余票為: "+ --tick);
}
}
}
}
// 使用 Lock
class Ticket implements Runnable{
private int tick = 100;
private Lock lock = new ReentrantLock();
public void run(){
while(true){
// 上鎖
lock.lock();
try{
if(tick > 0){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName()+"完成售票,余票為: "+ --tick);
}
}finally{
// 釋放鎖
lock.unlock();
}
}
}
}
// 練習: 程序按序交替
// 編寫一個程序,開啟3個線程,這三個線程的 ID 分別為 A, B, C, 每個線程將自己的 ID 在屏幕上打印10遍,
// 要求輸出的結果必須按順序顯示:
// 如: ABCABCABC... 依次遞歸
public class TestABCAlternate{
public static void main(String[] args){
AlternateDemo ad = new AlternateDemo();
new Thread(new Runnable(){
public void run(){
for(int i=1; i<20; i++){
ad.loopA(i);
}
}
},"A").start();
new Thread(new Runnable(){
public void run(){
for(int i=1; i<20; i++){
ad.loopB(i);
}
}
},"B").start();
new Thread(new Runnable(){
public void run(){
for(int i=1; i<20; i++){
ad.loopC(i);
System.out.println("--------------------");
}
}
},"C").start();
}
}
class AlternateDemo{
private int number = 1; // 當前正在執行線程的標記
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
// totalLoop 表示循環第幾輪
// 線程A
public void loopA(int totalLoop){
// 上鎖
lock.lock();
try{
// 1. 判斷
if(number != 1){
condition1.await();
}
// 2. 打印
for(int i=1; i <= 5; i++){
System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
}
// 3. 喚醒線程B
number = 2;
condition2.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
// 釋放鎖
lock.unlock();
}
}
// 線程B
public void loopB(int totalLoop){
// 上鎖
lock.lock();
try{
// 1. 判斷
if(number != 2){
condition2.await();
}
// 2. 打印
for(int i=1; i <= 15; i++){
System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
}
// 3. 喚醒線程C
number = 3;
condition3.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
// 釋放鎖
lock.unlock();
}
}
// 線程C
public void loopC(int totalLoop){
// 上鎖
lock.lock();
try{
// 1. 判斷
if(number != 3){
condition3.await();
}
// 2. 打印
for(int i=1; i <= 20; i++){
System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
}
// 3. 喚醒線程A
number = 1;
condition1.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
// 釋放鎖
lock.unlock();
}
}
}
7. ReadWriteLock(讀寫鎖)
// 測試類
public class TestReadWriteLock{
public static void main(String[] args){
ReadWriteLockDemo rw = new ReadWriteLockDemo();
// 一個線程進行寫
new Thread(new Runnable(){
public void run(){
rw.set((int)(Math.random()*100));
}
},"Write:").start();
// 100個線程進行讀操作
for(int i=0; i<100; i++){
new Thread(new Runnable(){
public void run(){
rw.get();
}
},"Read:").start();
}
}
}
class ReadWriteLockDemo{
private int number = 0;
private ReadWriteLock lock = new ReentrantReadWriteLock();
// 讀
public void get(){
lock.readLock().lock(); // 上鎖
try{
System.out.println(Thread.currentThread().getName()+":"+number);
}finally{
lock.readLock().unlock(); // 釋放鎖
}
}
// 寫
public void set(int number){
lock.writeLock().lock();
try{
System.out.println(Thread.currentThread().getName());
this.number = number;
}finally{
lock.writeLock().unlock();
}
}
}
8. 線程八鎖
// 測試類
public class Test{
public static void main(String[] args){
Demo demo = new Demo();
Demo demo2 = new Demo();
new Thread(new Runnable(){
public void run(){
demo.getOne();
}
}).start();
new Thread(new Runnable(){
public void run(){
// demo2.getTwo();
demo.getTwo();
}
}).start();
}
}
class Demo{
public synchronized void getOne(){
try{
Thread.sleep(3000);
}catch(InterruptedException e){
}
System.out.println("one");
}
public synchronized void getTwo(){
System.out.println("two");
}
}
/*
* 1. 兩個普通同步方法,兩個線程,標准打印, 打印輸出: one two
* 2. 新增 Thread.sleep() 給 getOne(), 打印輸出: one two
* 3. 新增普通方法 getThree(), 打印輸出: three one two
* 4. 兩個普通同步方法,兩個Demo對象, 兩個線程,打印輸出: two one
* 5. 修改 getOne() 為靜態同步方法, 一個Demo對象, 打印輸出: two one
* 6. 修改兩個方法都為靜態同步方法, 一個 Demo 對象, 打印輸出: one two
* 7. 修改 getone() 為靜態同步方法, 兩個 Demo 對象, 打印輸出: two one
* 8. 兩個均為靜態同步方法,兩個 Demo 對象,打印輸出: one two
*/
// 總結:
// 1. 非靜態方法的鎖默認為 this, 靜態方法的鎖為 "對應的Class實例";
// 2. 在某一個時刻內,只能有一個線程持有鎖,無論幾個方法;
9. 線程池
- 線程池提供了一個線程隊列,隊列中保存着所有等待狀態的線程;
- 避免了創建與銷毀線程的額外開銷,提高了響應速度;
- 線程池的體系結構
java.util.concurrent.Executor: 負責線程的使用和調度的根接口;ExecutorService: 子接口,線程池的主要接口;ThreadPoolExecutor: 線程池的實現類;ScheduledExecutorService: 子接口,負責線程的調度;ScheduledThreadPoolExecutor: 繼承了線程池的實現類,實現了負責線程調度的子接口;
- 工具類:
ExecutorsExecutorService newFixedThreadPool(): 創建固定大小的線程池;ExecutorService newCachedThreadPool(): 緩存線程池,線程池中線程的數量不固定,可以根據需求自動更改數量;ExecutorService newSingleThreadExecutor(): 創建單個線程池, 線程池中只有一個線程;ScheduledExecutorService newScheduledThreadPool(): 創建固定大小的線程,可以延時或定時的執行任務;
public class TestThreadPool{
public static void main(String[] args){
// 1. 創建線程池
ExecutorService pool = Executors.newFixedThreadPool(5);
ThreadPoolDemo tpd = new ThreadPoolDemo();
// 2. 為線程池中線程分配任務
// submit(Callable<T> task)
// submit(Runnable task)
for(int i=0; i<10; i++){
pool.submit(tpd);
}
// 3. 關閉線程池
pool.shutdown();
}
}
class ThreadPoolDemo implements Runnable{
private int i=0;
public void run(){
while(i <= 100){
System.out.println(Thread.currentThread().getName()+" : "+ i++)
}
}
}
9.1 線程調度
public class TestScheduledThreadPool{
public static void main(String[] args) throws Exception{
// 1. 創建線程池
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
// 2. 分配任務
// pool.shedule(Callalbe<T> callable, long delay, TimeUnit unit(時間單位))
for(int i=0; i < 10; i++){
Future<Integer> result = pool.schedule(new Callable<Integer>(){
public Integer call() throws Exception{
// 產生100以內的隨機數
int num = new Random().nextInt(100);
System.out.println(Thread.currentThread().getName()+ ":" + num);
return num;
}
}, 3, TimeUnit.SECONDS);
System.out.println(result.get());
}
//3. 關閉線程池
pool.shutdown();
}
}
10 Fork/Join 框架
public class TestForkJoinPool{
public static void main(String[] args){
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 100000000L);
Long sum = pool.invoke(task);
System.out.println(sum);
}
}
class ForkJoinSumCalculate extends RecursiveTask<Long>{
private static final long serialVersionUID = 24340990L;
private long start;
private long end;
private static final long THURSHOLD = 10000L; // 拆分臨界值
// 有參構造器
public ForkJoinSumCalculate(long start, long end){
this.start = start;
this.end = end;
}
public Long compute(){
long length = end - start;
if(length <= THURSHOLD){
long sum = 0L;
for(long i = start; i<=end; i++){
sum += i;
}
return sum;
}else{
long middle = (start + end ) / 2;
ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
left.fork(); // 進行拆分,同時壓入線程隊列
ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1, end);
right.fork(); // 進行拆分,同時壓入線程隊列
return left.join() + right.join();
}
}
}
參考資料
