Java基礎學習:JUC篇


1.什么是JUC?

  JUC全稱 java.util.concurrent 是在並發編程中很常用的實用工具類

2.  volatile 關鍵字內存可見性  

  2.1 內存可見性問題,先看下面這段代碼

package juc; public class TestVolatile { public static void main(String[] args) { ThreadDemo td = new ThreadDemo(); new Thread(td).start(); while (true){ if(td.isFlag()){ System.out.println("-----------------------------"); break; } } } } class ThreadDemo implements Runnable{ private boolean flag = false; @Override public void run() { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } flag = true; System.out.println("flag="+flag); } public boolean isFlag(){ return flag; } }

 

   將上面的代碼拿到IDEA去運行,發現控制台只打印輸出了flag=true,按照正常的情況,應該將 System.out.println("-----------------------------");,此段代碼也執行了才對,為什么這里卻沒有執行呢?這里涉及到了一個內存可見性問題,原因是此段代碼中有兩個

線程在執行,一個是主線程Main,一個是子線程,JDK會默認為每一個線程都提供一個緩存,提升效率,這就導致了一個問題,兩個線程都擁有一個緩存的flag值,子線程雖然執行了flag = true;但此時修改的flag值只是自己副本的flag值,Main也是讀取自己的flag值,

所以導致上述的問題存在。 

  PS:內存可見性問題是,當多個線程操作共享數據時,彼此不可見。

  2.2  如何解決?

    2.2.1 synchronized 關鍵字,同步鎖能保證數據的及時更新,能夠解決問題,但是這樣用會導致線程阻塞,影響效率。

while (true){ synchronized (td) { if (td.isFlag()) { System.out.println("-----------------------------"); break; } } }

    2.2.2 volatile 關鍵字:當多個線程操作共享數據時,可以保證內存中的數據可見,相較於synchronized是一種較為輕量級的同步策略。注意:1.volatile 不具備“互斥性”,2.volatile 不能保證變量的“原子性”

private volatile boolean flag = false;

 3.原子性

  3.1原子性問題,先看下面這段代碼

package juc; public class TestAtomicDemo { public static void main(String[] args) { AtomicDemo ad = new AtomicDemo(); for(int i = 0;i<10;i++){ new Thread(ad).start(); } } } class AtomicDemo implements Runnable{ private int serialNumber = 0; @Override public void run() { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(getSerialNumber()); } public int getSerialNumber(){ return serialNumber++; } }

  將上面的代碼運行,我們發現有幾率會出現,原子性問題,那么為什么會出現此問題呢,我們得研究一下i++的原理,i++的操作實際上分為三個步驟“讀-改-寫”

int i = 10; i = i ++; int temp = i; i = i + 1; i = temp;

  通過上面的分析我們可以得出,即使在serialNumber上修飾volatile關鍵字,也無法將此問題解決,那么我們要如何解決?

  3.2 JUC( java.util.concurrent.atomic ) 提供了原子變量

    3.2.1 通過觀察里面的類,可以發現,里面的類的變量都是用volatile修飾,保證內存可見性,CAS(compare-and-swap)算法保證數據的原子性,CAS算法時硬件對於並發操作共享數據的支持,CAS包含了三個操作數:內存值V預估值A更新值 ,當且僅當

V==A時,V=B,否則,將不做任何操作

// private int serialNumber = 0;
private AtomicInteger serialNumber = new AtomicInteger(0);

    將代碼修改為原子變量,即可解決上述的原子性問題

4.ConcurrentHashMap鎖分段機制

  4.1 Java5.0在java.util.concurrent 包中提供了多種並發容器來改進同步容器的性能

  4.2 ConcurrentHashMap同步容器是Java5增加的一個線程安全的哈希表,對與多線程的操作,介於HashMap與HashTable之間。內部采用“鎖分段”機制代替Hashtable的獨占鎖。進而提高性能。

  4.3 此包還提供了設計用於多線程上下文中的Collection實現:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList和CopyOnWriteArraySet。當期望許多線程訪問一個給定collection時, ConcurrentHashMap通

常優於同步的HashMap,ConcurrentSkipListMap通常優於同步的TreeMap.當期望的讀數和遍歷遠遠大於列表的更新數時,CopyOnWriteArrList優於同步的ArrayList

5.CountDownLatch閉鎖操作

  5.1 Java5.0在Java.util.concurrent包中提供了多種並發容器類來改進同步容器的性能

  5.2 CountDownLatch 一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。

  5.3 閉鎖可以延遲線程的進度直到其達到終止狀態,閉鎖可以用來確保某些活動直到其他活動都完成才繼續執行:

    5.3.1 確保某個計算在其需要的所有資源都被初始化之后才繼續執行

    5.3.2 確保某個服務在其依賴的所有其他服務都已經啟動之后才啟動

    5.3.3 等待直到某個操作所有參與者都准備就緒在繼續執行

  5.4 CountDownLatch:閉鎖,在完成某些運算時,只有其他所有的線程的運算全部完成,當前運算才算執行。以下代碼是用通過閉鎖計算10線程執行的時間

  5.5 CountDownLatch演示示例代碼:

package juc; import java.util.concurrent.CountDownLatch; public class TestCountDownLatch { public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(5); LatchDemo ld = new LatchDemo(latch); long start = System.currentTimeMillis(); for(int i = 0;i<5;i++){ new Thread(ld).start(); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("耗費時間為:"+(end - start)); } } class LatchDemo implements Runnable{ private CountDownLatch latch; public LatchDemo(CountDownLatch latch){ this.latch = latch; } @Override public void run() { synchronized (this) { try { for (int i = 0; i < 50000; i++) { if (i % 2 == 0) { System.out.println(i); } } }finally { latch.countDown(); } } } }

6.實現Callable接口

  6.1 創建執行線程的方式三:實現Callble接口。相較於實現Runnable接口的方式,方法可以有返回值,並且可以拋出異常

  6.2 Callable演示示例代碼:

package juc; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class TestCallable { public static void main(String[] args) { ThreadDemo td = new ThreadDemo(); // 1.執行Callable方式,需要FutureTask實現類的支持,用於接收運算結果。
        FutureTask<Integer> result = new FutureTask<Integer>(td); new Thread(result).start(); // 2.接收線程運算后的結果
        try { Integer sum = result.get(); System.out.println(sum); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } class ThreadDemo implements Callable<Integer>{ @Override public Integer call() throws Exception { int sum = 0; for(int i = 0; i<=100;i++){ System.out.println(i); sum+=i; } return sum; } }

7.Lock同步鎖

  7.1  用於解決多線程安全問題的方式,synchronized:隱式鎖,同步代碼塊、同步方法Jdk1.5后:同步鎖Lock,是一種顯式鎖 ,需要通過lock()方式上鎖,必須通過unlock方法進行釋放鎖;

  7.2 同步鎖演示示例代碼:

package juc; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLock { public static void main(String[] args) { Ticket tk = new Ticket(); new Thread(tk,"1 號").start(); new Thread(tk,"2 號").start(); new Thread(tk,"3 號").start(); } } class Ticket implements Runnable{ private int tick = 100; private Lock lock = new ReentrantLock(); @Override public void run() { while (true) { lock.lock(); try { if(tick > 0) { try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "完成售票,余票:" + --tick); } } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } } }

8.如何使用Lock實現等待喚醒機制

  8.1 Lock實現等待喚醒機制演示示例代碼:

package juc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor pro = new Productor(clerk); Consumer cus = new Consumer(clerk); new Thread(pro,"生成者 A").start(); new Thread(cus,"消費者 ").start(); } } class Clerk{ private int product = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void get(){ lock.lock(); try { while (product >= 1) { System.out.println("產品已滿"); try { //this.wait();
 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + ++product); //this.notifyAll();
 condition.signalAll(); }finally { lock.unlock(); } } public synchronized void sale(){ lock.lock(); try { while (product <= 0) { System.out.println("缺貨"); try { // this.wait();
 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + --product); //this.notifyAll();
 condition.signalAll(); }finally { lock.unlock(); } } } class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk){ this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 20 ;i++){ clerk.get(); } } } class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk){ this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 20 ; i++) { clerk.sale(); } } }

9.線程按序交替

  9.1 線程按序交替演示示例代碼:

package juc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestABCAlternate { public static void main(String[] args) { final AlternateDemo ad = new AlternateDemo(); new Thread(new Runnable() { @Override public void run() { for (int i = 1;i<=20;i++){ ad.loopA(i); } } },"A").start(); new Thread(new Runnable() { @Override public void run() { for (int i = 1;i<=20;i++){ ad.loopB(i); } } },"B").start(); new Thread(new Runnable() { @Override public void run() { for (int i = 1;i<=20;i++){ ad.loopC(i); System.out.println("-----------------------------------"); } } },"C").start(); } } class AlternateDemo{ private int number = 1; private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); public void loopA(int totalLoop){ lock.lock(); try { // 1. 判斷
           if(number != 1){ condition1.await(); } // 2.打印
            for(int i = 1;i<=5;i++){ System.out.println(Thread.currentThread().getName() + "\t" + i +"\t"+ totalLoop); } number = 2; condition2.signal(); } catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } public void loopB(int totalLoop){ lock.lock(); try{ // 1.判斷
            if(number != 2){ condition2.await(); } // 2.打印
            for(int i = 1;i<=15;i++){ System.out.println(Thread.currentThread().getName() + "\t" + i +"\t"+ totalLoop); } number = 3; condition3.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void loopC(int totalLoop){ lock.lock(); try{ // 1.判斷
            if(number != 3){ condition3.await(); } // 2.打印
            for(int i = 1;i<=20;i++){ System.out.println(Thread.currentThread().getName() + "\t" + i +"\t"+ totalLoop); } number = 1; condition1.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }

10.ReadWriteLock 讀寫鎖

  10.1 ReadWriteLock讀寫鎖演示示例代碼:

package juc; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 1. ReadWriteLock: 讀寫鎖 * 寫寫/讀寫 需要“互斥” * 讀讀不需要互斥 */
public class TestReadWriteLock { public static void main(String[] args) { final ReadWriteLockDemo rw = new ReadWriteLockDemo(); new Thread(new Runnable() { @Override public void run() { rw.set((int)(Math.random() * 101 )); } },"Write:").start(); for(int i = 0;i<100;i++ ){ new Thread(new Runnable() { @Override public void run() { rw.get(); } },"Read:").start(); } } } class ReadWriteLockDemo{ private int number = 0; private ReadWriteLock lock = new ReentrantReadWriteLock(); public void get(){ lock.readLock().lock(); // 上鎖
        try{ System.out.println(Thread.currentThread().getName() + ":" + number ); }finally { lock.readLock().unlock(); } } public void set(int number){ lock.writeLock().lock(); try{ System.out.println(Thread.currentThread().getName()); this.number = number; }finally { lock.writeLock().unlock(); } } }

11.線程八鎖

  11.1線程八鎖演示示例代碼:

package juc; /** * 題目:判斷打印的“one” or “two”? * * 1.兩個普通同步方法,兩個線程,標准打印,打印 // one two * 2.新增Thread.sleep() 給getOne() ,打印 // one two * 3.新增普通方法getThread(),打印 // one two * 4.兩個普通同步方法,兩個Number對象,打印// two one * 5.修改getOne() 為靜態同步方法,打印 // two one * 6.修改兩個方法均為靜態同步方法,一個Number對象 one two * 7.一個靜態同步方法,一個非靜態同步方法,兩個Number對象 two one * 8.兩個靜態同步方法,兩個Number對象 * * 線程八鎖的關鍵: * ① 非靜態方法的鎖默認為 this,靜態方法的鎖對應為Class 實例 * ② 某一個時刻內,只能有一個線程持有鎖,無論幾個方法 */
public class TestThread8Monitor { public static void main(String[] args) { final Number number = new Number(); new Thread(new Runnable() { @Override public void run() { number.getOne(); } }).start(); new Thread(new Runnable() { @Override public void run() { number.getTwo(); } }).start(); new Thread(new Runnable() { @Override public void run() { number.getThree(); } }).start(); } } class Number{ public synchronized void getOne(){ try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("One"); } public synchronized void getTwo(){ System.out.println("Two"); } public void getThree(){ System.out.println("Three"); } }

12.線程池

  12.1 線程池:提供了一個線程隊列,隊列中保存着所有等待狀態的線程,避免了創建與銷毀額外開銷,提高了響應的速度

  12.2 線程池的體系結構:

    Java.util.concurrent.Executor: 負責線程的使用與調度的根接口;

    |-- ExecutorService 子接口: 線程池的主要接口;

      |-- ThreadPoolExecutor 線程池的實現類;

      |-- ScheduledExecutorService 子接口:負責線程的調度;

        |-- ScheduledThreadPoolExecutor:繼承ThreadPoolExecutor,實現ScheduledExecutorService;

  12.3 工具類:Executors

    ExecutorService newFixedThreadPool():創建固定大小的線程池;

    ExecutorService newCachedThreadPool():緩存線程池,線程池的數量不固定,可以根據需求自動的更改數量;

    ExecutorService newSingleThreadExecutor():創建單個線程池,線程池中只有一個線程;

    ScheduledExecutorService newScheduledThreadPool():創建固定大小的線程,可以延遲或定時的執行任務;

  12.4  線程池演示示例代碼:

package juc; import java.util.concurrent.*; public class TestThreadPool { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1.創建線程池
        ExecutorService pool = Executors.newFixedThreadPool(5 ); Future<Integer> future = pool.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { int num = 0; for (int i = 0; i < 100; i++) { num +=i; } return num; } }); System.out.println(future.get()); pool.shutdown(); /*ThreadPoolDemo tpd = new ThreadPoolDemo(); // 2.為線程池中的線程分配任務 for (int i = 0; i < 20; i++) { pool.submit(tpd); } // 3.關閉線程池 pool.shutdown(); */ } } class ThreadPoolDemo implements Runnable{ private int i = 0; @Override public void run() { for (int j = 0; j < 20; j++) { System.out.println(Thread.currentThread().getName() + ":" +j ); } } }

  12.5 線程調度演示示例代碼:

package juc; import java.util.Random; import java.util.concurrent.*; public class TestScheduledThreadPool { public static void main(String[] args) throws ExecutionException, InterruptedException { ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); for(int i = 0;i < 5;i++) { Future<Integer> result = pool.schedule(new Callable<Integer>() { @Override public Integer call() throws Exception { int num = new Random().nextInt(100); System.out.println(Thread.currentThread().getName() + ":" + num); return num; } }, 3, TimeUnit.SECONDS); System.out.println(result.get()); } pool.shutdown(); } }

13.ForkJoinPool分支/合並框架工作竊取

  13.1 分支合並框架演示示例代碼:

package juc; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; public class TestForkJoinPool { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L,100000L); Long sum = pool.invoke(task); System.out.println(sum); } } class ForkJoinSumCalculate extends RecursiveTask<Long>{ private long start; private long end; private static final long THURSHOLD = 1000L; // 臨界值

    public ForkJoinSumCalculate(long start,long end){ this.start = start; this.end = end; } @Override protected Long compute() { long length = end - start; if(length <= THURSHOLD){ long sum = 0L; for (long i = start; i < end; i++) { sum += i; } return sum; }else { long middle = (start + end) / 2; ForkJoinSumCalculate left = new ForkJoinSumCalculate(start,middle); left.fork(); // 進行拆分,同時壓入線程隊列
 ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1,end); right.fork(); // 進行拆分,同時壓入線程隊列

            return left.join() + right.join(); } } }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM