Java 之 JUC


1. JUC 簡介

  • 在 Java 5.0 提供了 java.util.concurrent(簡稱JUC)包,在此包中增加了在並發編程中很常用的工具類,
    用於定義類似於線程的自定義子系統,包括線程池,異步 IO 和輕量級任務框架;還提供了設計用於多線程上下文中
    的 Collection 實現等;

2. volatile 關鍵字

  • volatile 關鍵字: 當多個線程進行操作共享數據時,可以保證內存中的數據是可見的;相較於 synchronized 是一種
    較為輕量級的同步策略;
  • volatile 不具備"互斥性";
  • volatile 不能保證變量的"原子性";
// 使用 volatile 之前
public class TestVolatile{

    public static void main(String[] args){
        ThreadDemo td = new ThreadDemo();
        new Thread(td).start();

        while(true){
            if(td.isFlag()){
                System.out.println("########");
                break;
            }
        }
    }
}

class ThreadDemo implements Runnable{
    private boolean flag = false;

    public void run(){
        try{
            // 該線程 sleep(200), 導致了程序無法執行成功
            Thread.sleep(200);
        }catch(InterruptedException e){
            e.printStackTrace();
        }

        flag = true;

        Sytem.out.println("flag="+isFlag());
    }

    public boolean isFlag(){
        return flag;
    }

    public void setFlag(boolean flag){
        this.flag = flag;
    }
}

// 解決問題方式一: 同步鎖
//   但是,效率太低
public class TestVolatile{

    public static void main(String[] args){
        ThreadDemo td = new ThreadDemo();
        new Thread(td).start();


        while(true){
            // 使用同步鎖
            synchronized(td){
                if(td.isFlag()){
                    System.out.println("########");
                    break;
                }
            }
        }
    }
}

// 解決方式二: 使用 volatile 關鍵字
public class TestVolatile{

    public static void main(String[] args){
        ThreadDemo td = new ThreadDemo();
        new Thread(td).start();

        while(true){
            if(td.isFlag()){
                System.out.println("########");
                break;
            }
        }
    }
}

class ThreadDemo implements Runnable{
    private volatile boolean flag = false;

    同上(略)
}

3. i++ 的原子性問題

  1. i++的操作實際上分為三個步驟: "讀-改-寫";
  2. 原子性: 就是"i++"的"讀-改-寫"是不可分割的三個步驟;
  3. 原子變量: JDK1.5 以后, java.util.concurrent.atomic包下,提供了常用的原子變量;
    • 原子變量中的值,使用 volatile 修飾,保證了內存可見性;
    • CAS(Compare-And-Swap) 算法保證數據的原子性;
int i = 10;
i = i++;  // 此時, i=10

執行步驟:
int temp = i;
i = i + 1;
i = temp;


// 測試類
public class TestAtomicDemo{
    public static void main(String[] args){

        AtomicDemo ad = new AtomicDemo();

        for(int i=0; i < 10; i++){
            new Thread(ad).start();
        }
    }
}

class AtomicDemo implements Runnable{
    private int serialNumber = 0;

    public void run(){

        try{
            Thread.sleep(200);
        }catch(InterruptedException e){

        }

        System.out.println(Thread.currentThread().getName() + ":" + getSerialNumber());
    }

    public int getSerialNumber(){
        return serialNumber++;
    }
}

// 改進: 使用原子變量
class AtomicDemo implements Runnable{

    private AtomicInteger serialNumber = new AtomicInteger();

    public void run(){
        try{
            Thread.sleep(200);
        }catch(InterruptedException e){

        }

        System.out.println(Thread.currentThread().getName()+":"+getSerialNumber());
    }

    public int getSerialNumber(){
        // 自增運算
        return serialNumber.getAndIncrement();
    }
}

3.1 CAS 算法

  • CAS(Compare-And-Swap) 算法是硬件對於並發的支持,針對多處理器操作而設計的處理器中的一種特殊指令,用於
    管理對共享數據的並發訪問;
  • CAS 是一種無鎖的非阻塞算法的實現;
  • CAS 包含了三個操作數:
    • 需要讀寫的內存值: V
    • 進行比較的預估值: A
    • 擬寫入的更新值: B
    • 當且僅當 V == A 時, V = B, 否則,將不做任何操作;
// 模擬CAS 算法
class CompareAndSwap{
    private int value;

    // 獲取內存值
    public synchronized int get(){
        return value;
    }

    // 無論更新成功與否,都返回修改之前的內存值
    public synchronized int compareAndSwap(int expectedValue, int newValue){
        // 獲取舊值
        int oldValue = value;

        if(oldValue == expectedValue){
            this.value = newValue;
        }

        // 返回修改之前的值
        return oldValue;
    }

    // 判斷是否設置成功
    public synchronized boolean compareAndSet(int expectedValue, int newValue){
        return expectedValue == compareAndSwap(expectedValue, newValue);
    }
}

public class TestCompareAndSwap{
    public static void main(String[] args){
        final CopareAndSwap cas = new CompareAndSwap();

        for(int i=0; i<10; i++){
            // 創建10個線程,模擬多線程環境
            new Thead(new Runnable(){
                public void run(){
                    int expectedValue = cas.get();

                    boolean b = cas.compareAndSet(expectedValue, (int)(Math.random()*100));
                    System.out.println(b);
                }
            }).start();
        }
    }
}

4. 並發容器類

  • Java 5.0 在 java.util.concurrent 包中提供了多種並發容器類來改進同步容器的性能;

4.1 ConcurrentHashMap

  • ConcurrentHashMap 同步容器類是 Java5 增加的一個線程安全的哈希表;介於 HashMap 與 Hashtable 之間;
    內部采用"鎖分段"機制替代Hashtable的獨占鎖,進而提高性能;
  • 此包還提供了設計用於多線程上下文中的Collection實現: ConcurrentHashMap,ConcurrentSkipListMap
    ConcurrentSkipListSet, CopyOnWriteArrayListCopyOnWriteArraySet;
    • 當期望許多線程訪問一個給定collection時,ConcurrentHashMap通常優於同步的HashMap;
      ConcurrentSkipListMap通常優於同步的TreeMap;
    • 當期望的讀數和遍歷遠遠大於列表的更新數時, CopyOnWriteArrayList優於同步的ArrayList;

4.2 CountDownLatch(閉鎖)

  • CountDownLatch是一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待;
// 測試類: 計算多線程的執行時間
public class TestCountDownLatch{
    public static void main(String[] args){
        final CountDownLatch latch = new CountDownLatch(10);
        LatchDemo ld = new LatchDemo(latch);

        long start = System.currentTimeMillis();

        // 創建10個線程
        for(int i=0; i<10; i++){
            new Thread(ld).start();
        }

        try{
            latch.await();
        }catch(InterruptedException e){

        }

        long end = System.currentTimeMillis();

        System.out.println("耗費時間為:"+(end - start));

    }
}

class LatchDemo implements Runnable{
    private CountDownLatch latch;

    // 有參構造器
    public LatchDemo(CountDownLatch latch){
        this.latch = latch;
    }

    public void run(){

        synchronized(this){
            try{
                // 打印50000以內的偶數
                for(int i=0; i<50000; i++){
                    if(i % 2 == 0){
                        System.out.println(i);
                    }
                }
            }finally{
                // 線程數量遞減
                latch.countDown();
            }
        }
    }
}

5. 創建執行線程的方式三

  • 相較於實現 Runnable 接口的方式,實現 Callable 接口類中的方法可以有返回值,並且可以拋出異常;
// 測試類
public class TestCallable{
    public static void main(String[] args){

        ThreadDemo td = new ThreadDemo();

        // 執行 Callable 方式,需要 FutureTask 實現類的支持
        // FutureTask 實現類用於接收運算結果, FutureTask 是 Future 接口的實現類
        FutureTask<Integer> result = new FutureTask<>(td);

        new Thread(result).start();

        // 接收線程運算后的結果
        try{
            // 只有當 Thread 線程執行完成后,才會打印結果;
            // 因此, FutureTask 也可用於閉鎖
            Integer sum = result.get();
            System.out.println(sum);
        }catch(InterruptedException | ExecutionException e){
            e.printStackTrace();
        }
    }
}

class ThreadDemo implements Callable<Integer>{

    // 需要實現的方法
    public Integer call() throws Exception{
        // 計算 0~100 的和
        int sum = 0;

        for(int i=0; i<=100; i++){
            sum += i;
        }

        return sum;
    }
}

6. 同步鎖(Lock)

// 測試類: 以賣票為例
// 使用 lock 之前
public class TestLock{
    public static void main(String[] args){
        Ticket ticket = new Ticket();

        new Thread(ticket,"1號窗口").start();
        new Thread(ticket,"2號窗口").start();
        new Thread(ticket,"3號窗口").start();
    }
}

class Ticket implements Runnable{

    private int tick = 100;

    public void run(){
        while(true){
            if(tick > 0){
                try{
                    Thread.sleep(200);
                }catch(InterruptedException e){

                }

                System.out.println(Thread.currentThread().getName()+"完成售票,余票為: "+ --tick);
            }
        }
    }
}

// 使用 Lock
class Ticket implements Runnable{

    private int tick = 100;

    private Lock lock = new ReentrantLock();

    public void run(){
        while(true){
            // 上鎖
            lock.lock();

            try{
                if(tick > 0){
                    try{
                        Thread.sleep(200);
                    }catch(InterruptedException e){

                    }
                    System.out.println(Thread.currentThread().getName()+"完成售票,余票為: "+ --tick);
                }
            }finally{
                // 釋放鎖
                lock.unlock();
            }
        }
    }
}

// 練習: 程序按序交替
// 編寫一個程序,開啟3個線程,這三個線程的 ID 分別為 A, B, C, 每個線程將自己的 ID 在屏幕上打印10遍,
// 要求輸出的結果必須按順序顯示:
// 如: ABCABCABC... 依次遞歸

public class TestABCAlternate{
    public static void main(String[] args){
        AlternateDemo ad = new AlternateDemo();

        new Thread(new Runnable(){
            public void run(){
                for(int i=1; i<20; i++){
                    ad.loopA(i);
                }
            }
        },"A").start();

        new Thread(new Runnable(){
            public void run(){
                for(int i=1; i<20; i++){
                    ad.loopB(i);
                }
            }
        },"B").start();

        new Thread(new Runnable(){
            public void run(){
                for(int i=1; i<20; i++){
                    ad.loopC(i);

                    System.out.println("--------------------");
                }
            }
        },"C").start();
    }
}

class AlternateDemo{

    private int number = 1; // 當前正在執行線程的標記

    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    // totalLoop 表示循環第幾輪
    // 線程A
    public void loopA(int totalLoop){
        // 上鎖
        lock.lock();

        try{
            // 1. 判斷
            if(number != 1){
                condition1.await();
            }

            // 2. 打印
            for(int i=1; i <= 5; i++){
                System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
            }

            // 3. 喚醒線程B
            number = 2;
            condition2.signal();

        }catch(Exception e){
            e.printStackTrace();
        }finally{
            // 釋放鎖
            lock.unlock();
        }
    }

    // 線程B
    public void loopB(int totalLoop){
        // 上鎖
        lock.lock();

        try{
            // 1. 判斷
            if(number != 2){
                condition2.await();
            }

            // 2. 打印
            for(int i=1; i <= 15; i++){
                System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
            }

            // 3. 喚醒線程C
            number = 3;
            condition3.signal();

        }catch(Exception e){
            e.printStackTrace();
        }finally{
            // 釋放鎖
            lock.unlock();
        }
    }

    // 線程C
    public void loopC(int totalLoop){
        // 上鎖
        lock.lock();

        try{
            // 1. 判斷
            if(number != 3){
                condition3.await();
            }

            // 2. 打印
            for(int i=1; i <= 20; i++){
                System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
            }

            // 3. 喚醒線程A
            number = 1;
            condition1.signal();

        }catch(Exception e){
            e.printStackTrace();
        }finally{
            // 釋放鎖
            lock.unlock();
        }
    }
}

7. ReadWriteLock(讀寫鎖)

// 測試類
public class TestReadWriteLock{

    public static void main(String[] args){
        ReadWriteLockDemo rw = new ReadWriteLockDemo();

        // 一個線程進行寫
        new Thread(new Runnable(){
            public void run(){
                rw.set((int)(Math.random()*100));
            }
        },"Write:").start();

        // 100個線程進行讀操作
        for(int i=0; i<100; i++){
            new Thread(new Runnable(){
                public void run(){
                    rw.get();
                }
            },"Read:").start();
        }
    }

}

class ReadWriteLockDemo{
    private int number = 0;

    private ReadWriteLock lock = new ReentrantReadWriteLock();



    // 讀
    public void get(){
        lock.readLock().lock(); // 上鎖

        try{
            System.out.println(Thread.currentThread().getName()+":"+number);
        }finally{
            lock.readLock().unlock(); // 釋放鎖
        }
    }

    // 寫
    public void set(int number){

        lock.writeLock().lock();

        try{    
            System.out.println(Thread.currentThread().getName());

            this.number = number;
        }finally{
            lock.writeLock().unlock();
        }
    }
}

8. 線程八鎖

// 測試類
public class Test{
    public static void main(String[] args){
        Demo demo = new Demo();

        Demo demo2 = new Demo();

        new Thread(new Runnable(){
            public void run(){
                demo.getOne();
            }
        }).start();

        new Thread(new Runnable(){
            public void run(){

                // demo2.getTwo();
                demo.getTwo();
            }
        }).start();
    }
}

class Demo{
    public synchronized void getOne(){
        try{
            Thread.sleep(3000);
        }catch(InterruptedException e){

        }

        System.out.println("one");
    }

    public synchronized void getTwo(){
        System.out.println("two");
    }

}

/*
 * 1. 兩個普通同步方法,兩個線程,標准打印, 打印輸出: one  two
 * 2. 新增 Thread.sleep() 給 getOne(), 打印輸出: one  two
 * 3. 新增普通方法 getThree(), 打印輸出: three  one  two
 * 4. 兩個普通同步方法,兩個Demo對象, 兩個線程,打印輸出: two  one
 * 5. 修改 getOne() 為靜態同步方法, 一個Demo對象, 打印輸出: two  one
 * 6. 修改兩個方法都為靜態同步方法, 一個 Demo 對象, 打印輸出: one  two
 * 7. 修改 getone() 為靜態同步方法, 兩個 Demo 對象, 打印輸出: two  one
 * 8. 兩個均為靜態同步方法,兩個 Demo 對象,打印輸出: one  two
 */

 // 總結:
 //    1. 非靜態方法的鎖默認為 this, 靜態方法的鎖為 "對應的Class實例";
 //    2. 在某一個時刻內,只能有一個線程持有鎖,無論幾個方法;

9. 線程池

  • 線程池提供了一個線程隊列,隊列中保存着所有等待狀態的線程;
  • 避免了創建與銷毀線程的額外開銷,提高了響應速度;
  • 線程池的體系結構
    • java.util.concurrent.Executor: 負責線程的使用和調度的根接口;
    • ExecutorService: 子接口,線程池的主要接口;
    • ThreadPoolExecutor: 線程池的實現類;
    • ScheduledExecutorService: 子接口,負責線程的調度;
    • ScheduledThreadPoolExecutor: 繼承了線程池的實現類,實現了負責線程調度的子接口;
  • 工具類: Executors
    • ExecutorService newFixedThreadPool(): 創建固定大小的線程池;
    • ExecutorService newCachedThreadPool(): 緩存線程池,線程池中線程的數量不固定,可以根據需求自動更改數量;
    • ExecutorService newSingleThreadExecutor(): 創建單個線程池, 線程池中只有一個線程;
    • ScheduledExecutorService newScheduledThreadPool(): 創建固定大小的線程,可以延時或定時的執行任務;
public class TestThreadPool{
    public static void main(String[] args){
        // 1. 創建線程池
        ExecutorService pool = Executors.newFixedThreadPool(5);

        ThreadPoolDemo tpd = new ThreadPoolDemo();

        // 2. 為線程池中線程分配任務
        //    submit(Callable<T> task)
        //    submit(Runnable task)

        for(int i=0; i<10; i++){
            pool.submit(tpd);
        }

        // 3. 關閉線程池
        pool.shutdown();
    }
}

class ThreadPoolDemo implements Runnable{

    private int i=0;

    public void run(){
        while(i <= 100){
            System.out.println(Thread.currentThread().getName()+" : "+ i++)
        }
    }
}

9.1 線程調度

public class TestScheduledThreadPool{

    public static void main(String[] args) throws Exception{
        // 1. 創建線程池
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);

        // 2. 分配任務
        //      pool.shedule(Callalbe<T> callable, long delay, TimeUnit unit(時間單位))

        for(int i=0; i < 10; i++){
            Future<Integer> result = pool.schedule(new Callable<Integer>(){

                public Integer call() throws Exception{
                        // 產生100以內的隨機數
                        int num = new Random().nextInt(100);

                        System.out.println(Thread.currentThread().getName()+ ":" + num);

                        return num;
                    }
                }, 3, TimeUnit.SECONDS);

                System.out.println(result.get());
        }

        //3. 關閉線程池
        pool.shutdown();
    }
}

10 Fork/Join 框架

public class TestForkJoinPool{
    public static void main(String[] args){
        ForkJoinPool pool = new ForkJoinPool();

        ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 100000000L);

        Long sum = pool.invoke(task);

        System.out.println(sum);
    }

}

class ForkJoinSumCalculate extends RecursiveTask<Long>{

    private static final long serialVersionUID = 24340990L;

    private long start;
    private long end;

    private static final long THURSHOLD = 10000L;  // 拆分臨界值

    // 有參構造器
    public ForkJoinSumCalculate(long start, long end){
        this.start = start;
        this.end = end;
    }

    public Long compute(){
        long length = end - start;

        if(length <= THURSHOLD){
            long  sum = 0L;

            for(long i = start; i<=end; i++){
                sum += i;
            }
            return sum;
        }else{
            long middle = (start + end ) / 2;
            ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
            left.fork(); // 進行拆分,同時壓入線程隊列

            ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1, end);
            right.fork(); // 進行拆分,同時壓入線程隊列

            return left.join() + right.join();
        }
    }
}

參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM