一、分析下面程序輸出:
/** * 分析一下這個程序的輸出 * @author mashibing */ package yxxy.c_005; public class T implements Runnable { private int count = 10; public synchronized void run() { count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } public static void main(String[] args) { T t = new T(); for(int i=0; i<5; i++) { new Thread(t, "THREAD" + i).start(); } } }
THREAD0 count = 9 THREAD4 count = 8 THREAD1 count = 7 THREAD3 count = 6 THREAD2 count = 5
分析:
二、對比上一個程序,分析這個程序的輸出:
/** * 對比上面一個小程序,分析一下這個程序的輸出 * @author mashibing */ package yxxy.c_006; public class T implements Runnable { private int count = 10; public synchronized void run() { count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } public static void main(String[] args) { for(int i=0; i<5; i++) { T t = new T(); new Thread(t, "THREAD" + i).start(); } } }
THREAD0 count = 9 THREAD4 count = 9 THREAD3 count = 9 THREAD1 count = 9 THREAD2 count = 9
分析:
三、同步和非同步方法是否可以同時調用?
/** * 同步和非同步方法是否可以同時調用? * @author mashibing */ package yxxy.c_007; public class T { public synchronized void m1() { System.out.println(Thread.currentThread().getName() + " m1 start..."); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m1 end"); } public void m2() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m2 "); } public static void main(String[] args) { T t = new T(); new Thread(()->t.m1(), "t1").start(); new Thread(()->t.m2(), "t2").start(); } }
t1 m1 start... t2 m2 t1 m1 end
分析:
public static void main(String[] args) { T t = new T(); new Thread(new Runnable(){ @Override public void run() { t.m1(); } }, "t1").start(); new Thread(new Runnable(){ @Override public void run() { t.m2(); } }, "t2").start(); }
public static void main(String[] args) { T t = new T(); new Thread(t::m1, "t1").start(); new Thread(t::m2, "t2").start();*/ }
四:對業務寫方法加鎖,對業務讀方法不加鎖,容易產生臟讀問題(dirtyRead)
臟讀:讀到沒有寫過程中沒有完成的數據
/** * 對業務寫方法加鎖 * 對業務讀方法不加鎖 * 容易產生臟讀問題(dirtyRead) */ package yxxy.c_008; import java.util.concurrent.TimeUnit; public class Account { String name; double balance; public synchronized void set(String name, double balance) { this.name = name; try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } this.balance = balance; } public double getBalance(String name) { return this.balance; } public static void main(String[] args) { Account a = new Account(); new Thread(()->a.set("zhangsan", 100.0)).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(a.getBalance("zhangsan")); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(a.getBalance("zhangsan")); } }
0.0 100.0
分析:
public synchronized double getBalance(String name) { return this.balance; }
五、一個同步方法可以調用另外一個同步方法:
一個線程已經擁有某個對象的鎖,再次申請的時候仍然會得到該對象的鎖.
/** * 一個同步方法可以調用另外一個同步方法,一個線程已經擁有某個對象的鎖,再次申請的時候仍然會得到該對象的鎖. * 也就是說synchronized獲得的鎖是可重入的.(可重入的意思就是獲得鎖之后還可以再獲得一遍) * @author mashibing */ package yxxy.c_009; import java.util.concurrent.TimeUnit; public class T { synchronized void m1() { System.out.println("m1 start"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } m2(); } synchronized void m2() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("m2"); } }
分析:
六、重入鎖的另外一種情形,繼承中子類的同步方法調用父類的同步方法
/** * 一個同步方法可以調用另外一個同步方法,一個線程已經擁有某個對象的鎖,再次申請的時候仍然會得到該對象的鎖. * 也就是說synchronized獲得的鎖是可重入的 * 這里是繼承中有可能發生的情形,子類調用父類的同步方法 * @author mashibing */ package yxxy.c_010; import java.util.concurrent.TimeUnit; public class T { synchronized void m() { System.out.println("m start"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("m end"); } public static void main(String[] args) { new TT().m(); } } class TT extends T { @Override synchronized void m() { System.out.println("child m start"); super.m(); System.out.println("child m end"); } }
七、synchronized同步方法如果遇到異常,鎖就會被釋放
/** * 程序在執行過程中,如果出現異常,默認情況鎖會被釋放 * 所以,在並發處理的過程中,有異常要多加小心,不然可能會發生不一致的情況。 * 比如,在一個web app處理過程中,多個servlet線程共同訪問同一個資源,這時如果異常處理不合適, * 在第一個線程中拋出異常,其他線程就會進入同步代碼區,有可能會訪問到異常產生時的數據。 * 因此要非常小心的處理同步業務邏輯中的異常 * @author mashibing */ package yxxy.c_011; import java.util.concurrent.TimeUnit; public class T { int count = 0; synchronized void m() { System.out.println(Thread.currentThread().getName() + " start"); while(true) { count ++; System.out.println(Thread.currentThread().getName() + " count = " + count); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if(count == 5) { int i = 1/0; //此處拋出異常,鎖將被釋放,要想不被釋放,可以在這里進行catch,然后讓循環繼續 } } } public static void main(String[] args) { T t = new T(); new Thread(()->t.m(), "t1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->t.m(), "t2").start(); } }
執行結果
t1 start t1 count = 1 t1 count = 2 t1 count = 3 t1 count = 4 t1 count = 5 t2 start t2 count = 6 Exception in thread "t1" java.lang.ArithmeticException: / by zero at yxxy.c_011.T.m(T.java:28) at yxxy.c_011.T.lambda$0(T.java:36) at java.lang.Thread.run(Thread.java:745) t2 count = 7 t2 count = 8 t2 count = 9
/** * 程序在執行過程中,如果出現異常,默認情況鎖會被釋放 * 所以,在並發處理的過程中,有異常要多加小心,不然可能會發生不一致的情況。 * 比如,在一個web app處理過程中,多個servlet線程共同訪問同一個資源,這時如果異常處理不合適, * 在第一個線程中拋出異常,其他線程就會進入同步代碼區,有可能會訪問到異常產生時的數據。 * 因此要非常小心的處理同步業務邏輯中的異常 * @author mashibing */ package yxxy.c_011; import java.util.concurrent.TimeUnit; public class T { int count = 0; synchronized void m() { System.out.println(Thread.currentThread().getName() + " start"); while(true) { count ++; System.out.println(Thread.currentThread().getName() + " count = " + count); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if(count == 5) { try{ int i = 1/0; //此處拋出異常,鎖將被釋放,要想不被釋放,可以在這里進行catch,然后讓循環繼續 }catch(Exception e){ System.out.println(e.getMessage()); } } } } public static void main(String[] args) { T t = new T(); new Thread(()->t.m(), "t1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->t.m(), "t2").start(); } }
t1 start t1 count = 1 t1 count = 2 t1 count = 3 t1 count = 4 t1 count = 5 / by zero t1 count = 6 t1 count = 7 t1 count = 8 t1 count = 9 t1 count = 10 t1 count = 11 t1 count = 12
八、volatile關鍵字
/** * volatile 關鍵字,使一個變量在多個線程間可見 * A B線程都用到一個變量,java默認是A線程中保留一份copy,這樣如果B線程修改了該變量,則A線程未必知道 * 使用volatile關鍵字,會讓所有線程都會讀到變量的修改值 * * 在下面的代碼中,running是存在於堆內存的t對象中 * 當線程t1開始運行的時候,會把running值從內存中讀到t1線程的工作區,在運行過程中直接使用這個copy,並不會每次都去 * 讀取堆內存,這樣,當主線程修改running的值之后,t1線程感知不到,所以不會停止運行 * * 使用volatile,將會強制所有線程都去堆內存中讀取running的值 * * 可以閱讀這篇文章進行更深入的理解 * http://www.cnblogs.com/nexiyi/p/java_memory_model_and_thread.html * * volatile並不能保證多個線程共同修改running變量時所帶來的不一致問題,也就是說volatile不能替代synchronized * @author mashibing */ package yxxy.c_012; import java.util.concurrent.TimeUnit; public class T { volatile boolean running = true; //對比一下有無volatile的情況下,整個程序運行結果的區別 void m() { System.out.println("m start"); while(running) { } System.out.println("m end!"); } public static void main(String[] args) { T t = new T(); new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t.running = false; } }
分析:
圖:
九、volatile並不能保證多個線程共同修改running變量時所帶來的不一致問題,也就是說volatile不能替代synchronized
/** * volatile並不能保證多個線程共同修改running變量時所帶來的不一致問題,也就是說volatile不能替代synchronized * 運行下面的程序,並分析結果 * @author mashibing */ package yxxy.c_013; import java.util.ArrayList; import java.util.List; public class T { volatile int count = 0; void m() { for(int i=0; i<10000; i++) count++; } public static void main(String[] args) { T t = new T(); List<Thread> threads = new ArrayList<Thread>(); for(int i=0; i<10; i++) { threads.add(new Thread(t::m, "thread-"+i)); } threads.forEach((o)->o.start()); threads.forEach((o)->{ try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(t.count); } }
volatile和synchronized區別?
十、對比上一個程序,可以用synchronized解決
/** * 解決同樣的問題的更高效的方法,使用AtomXXX類 * AtomXXX類本身方法都是原子性的,但不能保證多個方法連續調用是原子性的 * @author mashibing */ package yxxy.c_015; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class T { AtomicInteger count = new AtomicInteger(0); void m() { for (int i = 0; i < 10000; i++) count.incrementAndGet(); //count++ } public static void main(String[] args) { T t = new T(); List<Thread> threads = new ArrayList<Thread>(); for (int i = 0; i < 10; i++) { threads.add(new Thread(t::m, "thread-" + i)); } threads.forEach((o) -> o.start()); threads.forEach((o) -> { try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(t.count); } }
運行結果:100000
十二、synchronized優化
/** * 鎖定某對象o,如果o的屬性發生改變,不影響鎖的使用 * 但是如果o變成另外一個對象,則鎖定的對象發生改變 * 應該避免將鎖定對象的引用變成另外的對象 * @author mashibing */ package yxxy.c_017; import java.util.concurrent.TimeUnit; public class T { Object o = new Object(); void m() { synchronized(o) { while(true) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } } } public static void main(String[] args) { T t = new T(); //啟動第一個線程 new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //創建第二個線程 Thread t2 = new Thread(t::m, "t2"); t.o = new Object(); //鎖對象發生改變,所以t2線程得以執行,如果注釋掉這句話,線程2將永遠得不到執行機會 t2.start(); } }
分析:
十三、避免將鎖定對象的引用變成另外的對象,例子:
/** * 鎖定某對象o,如果o的屬性發生改變,不影響鎖的使用 * 但是如果o變成另外一個對象,則鎖定的對象發生改變 * 應該避免將鎖定對象的引用變成另外的對象 * @author mashibing */ package yxxy.c_017; import java.util.concurrent.TimeUnit; public class T { Object o = new Object(); void m() { synchronized(o) { while(true) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } } } public static void main(String[] args) { T t = new T(); //啟動第一個線程 new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //創建第二個線程 Thread t2 = new Thread(t::m, "t2"); t.o = new Object(); //鎖對象發生改變,所以t2線程得以執行,如果注釋掉這句話,線程2將永遠得不到執行機會 t2.start(); } }
分析:
十四、不要以字符串常量作為鎖定對象
/** * 不要以字符串常量作為鎖定對象 * 在下面的例子中,m1和m2其實鎖定的是同一個對象 * 這種情況還會發生比較詭異的現象,比如你用到了一個類庫,在該類庫中代碼鎖定了字符串“Hello”, * 但是你讀不到源碼,所以你在自己的代碼中也鎖定了"Hello",這時候就有可能發生非常詭異的死鎖阻塞, * 因為你的程序和你用到的類庫不經意間使用了同一把鎖 * * jetty * * @author mashibing */ package yxxy.c_018; public class T { String s1 = "Hello"; String s2 = "Hello"; void m1() { synchronized(s1) { } } void m2() { synchronized(s2) { } } }
十五:分析一道面試題
/** * 曾經的面試題:(淘寶?) * 實現一個容器,提供兩個方法,add,size * 寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束 * * 分析下面這個程序,能完成這個功能嗎? * @author mashibing */ package yxxy.c_019; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class MyContainer1 { List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer1 c = new MyContainer1(); new Thread(() -> { for(int i=0; i<10; i++) { c.add(new Object()); System.out.println("add " + i); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1").start(); new Thread(() -> { while(true) { if(c.size() == 5) { break; } } System.out.println("t2 結束"); }, "t2").start(); } }
/** * 曾經的面試題:(淘寶?) * 實現一個容器,提供兩個方法,add,size * 寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束 * * 給lists添加volatile之后,t2能夠接到通知,但是,t2線程的死循環很浪費cpu,如果不用死循環,該怎么做呢? * @author mashibing */ package yxxy.c_019; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class MyContainer2 { //添加volatile,使t2能夠得到通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer2 c = new MyContainer2(); new Thread(() -> { for(int i=0; i<10; i++) { c.add(new Object()); System.out.println("add " + i); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1").start(); new Thread(() -> { while(true) { if(c.size() == 5) { break; } } System.out.println("t2 結束"); }, "t2").start(); } }
但是上面代碼還存在兩個問題:
1)由於沒加同步,c.size()等於5的時候,假如另外一個線程又往上增加了1個,實際上這時候已經等於6了才break,所以不是很精確;
2)浪費CPU,t2線程的死循環很浪費cpu
使用wait和notify
/** * 曾經的面試題:(淘寶?) * 實現一個容器,提供兩個方法,add,size * 寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束 * * 給lists添加volatile之后,t2能夠接到通知,但是,t2線程的死循環很浪費cpu,如果不用死循環,該怎么做呢? * * 這里使用wait和notify做到,wait會釋放鎖,而notify不會釋放鎖 * 需要注意的是,運用這種方法,必須要保證t2先執行,也就是首先讓t2監聽才可以 * * 閱讀下面的程序,並分析輸出結果 * 可以讀到輸出結果並不是size=5時t2退出,而是t1結束時t2才接收到通知而退出 * 想想這是為什么? * @author mashibing */ package yxxy.c_019; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class MyContainer3 { //添加volatile,使t2能夠得到通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer3 c = new MyContainer3(); final Object lock = new Object(); new Thread(() -> { synchronized(lock) { System.out.println("t2啟動"); if(c.size() != 5) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 結束"); } }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1啟動"); synchronized(lock) { for(int i=0; i<10; i++) { c.add(new Object()); System.out.println("add " + i); if(c.size() == 5) { lock.notify(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "t1").start(); } }
分析:
1)解釋wait和notify,notifyAll方法:
wait:讓正在運行的線程進入等待狀態,並且釋放鎖
notify:喚醒某個正在等待的線程,不能精確換新某個線程
notifyAll:喚醒所有正在等待的線程
/** * 曾經的面試題:(淘寶?) * 實現一個容器,提供兩個方法,add,size * 寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束 * * 給lists添加volatile之后,t2能夠接到通知,但是,t2線程的死循環很浪費cpu,如果不用死循環,該怎么做呢? * * 這里使用wait和notify做到,wait會釋放鎖,而notify不會釋放鎖 * 需要注意的是,運用這種方法,必須要保證t2先執行,也就是首先讓t2監聽才可以 * * 閱讀下面的程序,並分析輸出結果 * 可以讀到輸出結果並不是size=5時t2退出,而是t1結束時t2才接收到通知而退出 * 想想這是為什么? * * notify之后,t1必須釋放鎖,t2退出后,也必須notify,通知t1繼續執行 * 整個通信過程比較繁瑣 * @author mashibing */ package yxxy.c_019; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class MyContainer4 { //添加volatile,使t2能夠得到通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer4 c = new MyContainer4(); final Object lock = new Object(); new Thread(() -> { synchronized(lock) { System.out.println("t2啟動"); if(c.size() != 5) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 結束"); //通知t1繼續執行 lock.notify(); } }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1啟動"); synchronized(lock) { for(int i=0; i<10; i++) { c.add(new Object()); System.out.println("add " + i); if(c.size() == 5) { lock.notify(); //釋放鎖,讓t2得以執行 try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "t1").start(); } }
流程圖:
/** * 曾經的面試題:(淘寶?) * 實現一個容器,提供兩個方法,add,size * 寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束 * * 給lists添加volatile之后,t2能夠接到通知,但是,t2線程的死循環很浪費cpu,如果不用死循環,該怎么做呢? * * 這里使用wait和notify做到,wait會釋放鎖,而notify不會釋放鎖 * 需要注意的是,運用這種方法,必須要保證t2先執行,也就是首先讓t2監聽才可以 * * 閱讀下面的程序,並分析輸出結果 * 可以讀到輸出結果並不是size=5時t2退出,而是t1結束時t2才接收到通知而退出 * 想想這是為什么? * * notify之后,t1必須釋放鎖,t2退出后,也必須notify,通知t1繼續執行 * 整個通信過程比較繁瑣 * * 使用Latch(門閂)替代wait notify來進行通知 * 好處是通信方式簡單,同時也可以指定等待時間 * 使用await和countdown方法替代wait和notify * CountDownLatch不涉及鎖定,當count的值為零時當前線程繼續運行 * 當不涉及同步,只是涉及線程通信的時候,用synchronized + wait/notify就顯得太重了 * 這時應該考慮countdownlatch/cyclicbarrier/semaphore * @author mashibing */ package yxxy.c_019; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class MyContainer5 { // 添加volatile,使t2能夠得到通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer5 c = new MyContainer5(); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { System.out.println("t2啟動"); if (c.size() != 5) { try { latch.await(); //也可以指定等待時間 //latch.await(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 結束"); }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1啟動"); for (int i = 0; i < 10; i++) { c.add(new Object()); System.out.println("add " + i); if (c.size() == 5) { // 打開門閂,讓t2得以執行 latch.countDown(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1").start(); } }
十六:ReentrantLock
/** * reentrantlock用於替代synchronized * 本例中由於m1鎖定this,只有m1執行完畢的時候,m2才能執行 * 這里是復習synchronized最原始的語義 * @author mashibing */ package yxxy.c_020; import java.util.concurrent.TimeUnit; public class ReentrantLock1 { synchronized void m1() { for(int i=0; i<10; i++) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(i); } } synchronized void m2() { System.out.println("m2 ..."); } public static void main(String[] args) { ReentrantLock1 rl = new ReentrantLock1(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }
使用ReentrantLock完成同樣功能
/** * reentrantlock用於替代synchronized * 使用reentrantlock可以完成同樣的功能 * 需要注意的是,必須要必須要必須要手動釋放鎖(重要的事情說三遍) * 使用syn鎖定的話如果遇到異常,jvm會自動釋放鎖,但是lock必須手動釋放鎖,因此經常在finally中進行鎖的釋放 * @author mashibing */ package yxxy.c_020; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock2 { Lock lock = new ReentrantLock(); void m1() { try { lock.lock(); //synchronized(this) for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } void m2() { lock.lock(); System.out.println("m2 ..."); lock.unlock(); } public static void main(String[] args) { ReentrantLock2 rl = new ReentrantLock2(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }
十七:RenntrantLock的tryLock
/** * 使用reentrantlock可以進行“嘗試鎖定”tryLock,這樣無法鎖定,或者在指定時間內無法鎖定,線程可以決定是否繼續等待 * @author mashibing */ package yxxy.c_020; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock3 { Lock lock = new ReentrantLock(); void m1() { try { lock.lock(); for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * 使用tryLock進行嘗試鎖定,不管鎖定與否,方法都將繼續執行 * 可以根據tryLock的返回值來判定是否鎖定 * 也可以指定tryLock的時間,由於tryLock(time)拋出異常,所以要注意unclock的處理,必須放到finally中 */ void m2() { /* boolean locked = lock.tryLock(); System.out.println("m2 ..." + locked); if(locked) lock.unlock(); */ boolean locked = false; try { locked = lock.tryLock(5, TimeUnit.SECONDS); System.out.println("m2 ..." + locked); } catch (InterruptedException e) { e.printStackTrace(); } finally { if(locked) lock.unlock(); } } public static void main(String[] args) { ReentrantLock3 rl = new ReentrantLock3(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }
1 3 5 m2 ...false 7 9
十八:ReentrantLock的lockInterruptibly方法
/** * 使用ReentrantLock還可以調用lockInterruptibly方法,可以對線程interrupt方法做出響應, * 在一個線程等待鎖的過程中,可以被打斷 * * @author mashibing */ package yxxy.c_020; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; public class ReentrantLock4 { public static void main(String[] args) { Lock lock = new ReentrantLock(); Thread t1 = new Thread(()->{ try { lock.lock(); System.out.println("t1 start"); TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); System.out.println("t1 end"); } catch (InterruptedException e) { System.out.println("interrupted!"); } finally { lock.unlock(); } }); t1.start(); Thread t2 = new Thread(()->{ try { //lock.lock(); lock.lockInterruptibly(); //可以對interrupt()方法做出響應 System.out.println("t2 start"); } catch (InterruptedException e) { System.out.println("interrupted!"); } finally { lock.unlock(); } }); t2.start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t2.interrupt(); //打斷線程2的等待 } }
t1 start interrupted! Exception in thread "Thread-1" java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261) at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457) at yxxy.c_020.ReentrantLock4.lambda$1(ReentrantLock4.java:42) at java.lang.Thread.run(Thread.java:745)
分析:
t1線程牢牢的拿到鎖之后,一直sleep不會釋放,如果t2線程中的run方法使用lock.lock(),那么t2線程就會一直傻傻的等着這把鎖,不能被其他線程打斷;
而使用lockInterruptibly()方法是可以被打斷的,主線程main調用t2.interrupt()來打斷t2,告訴他是不會拿到這把鎖的,別等了;
報錯是因為lock.unlock()這個方法報錯的,因為都沒有拿到鎖,無法unlock();是代碼的問題,應該判斷有鎖,已經鎖定的情況下才lock.unlock();
十九:ReentrantLock還可以指定為公平鎖
公平鎖:等待時間長的線程先執行
競爭鎖:多個線程一起競爭一個鎖
競爭鎖相對效率高
/** * ReentrantLock還可以指定為公平鎖 * * @author mashibing */ package yxxy.c_020; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock5 extends Thread { private static ReentrantLock lock = new ReentrantLock(); //參數為true表示為公平鎖,請對比輸出結果 public void run() { for(int i=0; i<100; i++) { lock.lock(); try{ System.out.println(Thread.currentThread().getName()+"獲得鎖"); }finally{ lock.unlock(); } } } public static void main(String[] args) { ReentrantLock5 rl=new ReentrantLock5(); Thread th1=new Thread(rl); Thread th2=new Thread(rl); th1.start(); th2.start(); } }
二十:面試經典(生產者消費者問題)
要求:寫一個固定容量同步容器,擁有put和get方法,以及getCount方法,能夠支持2個生產者線程以及10個消費者線程的阻塞調用
復制代碼 /** * 面試題:寫一個固定容量同步容器,擁有put和get方法,以及getCount方法, * 能夠支持2個生產者線程以及10個消費者線程的阻塞調用 * * 使用wait和notify/notifyAll來實現 * * @author mashibing */ package yxxy.c_021; import java.util.LinkedList; import java.util.concurrent.TimeUnit; public class MyContainer1<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10個元素 private int count = 0; public synchronized void put(T t) { while(lists.size() == MAX) { //想想為什么用while而不是用if? try { this.wait(); //effective java } catch (InterruptedException e) { e.printStackTrace(); } } lists.add(t); ++count; this.notifyAll(); //通知消費者線程進行消費 } public synchronized T get() { T t = null; while(lists.size() == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } t = lists.removeFirst(); count --; this.notifyAll(); //通知生產者進行生產 return t; } public static void main(String[] args) { MyContainer1<String> c = new MyContainer1<>(); //啟動消費者線程 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++) System.out.println(c.get()); }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //啟動生產者線程 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j); }, "p" + i).start(); } } }
1.為什么用while而不用if?
使用wati和notify寫線程程序的時候寫起來會比較費勁,使用Lock和Condition
/** * 面試題:寫一個固定容量同步容器,擁有put和get方法,以及getCount方法, * 能夠支持2個生產者線程以及10個消費者線程的阻塞調用 * * 使用Lock和Condition來實現 * 對比兩種方式,Condition的方式可以更加精確的指定哪些線程被喚醒 * * @author mashibing */ package yxxy.c_021; import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class MyContainer2<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10個元素 private int count = 0; private Lock lock = new ReentrantLock(); private Condition producer = lock.newCondition(); private Condition consumer = lock.newCondition(); public void put(T t) { try { lock.lock(); while(lists.size() == MAX) { producer.await(); } lists.add(t); ++count; consumer.signalAll(); //通知消費者線程進行消費 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public T get() { T t = null; try { lock.lock(); while(lists.size() == 0) { consumer.await(); } t = lists.removeFirst(); count --; producer.signalAll(); //通知生產者進行生產 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return t; } public static void main(String[] args) { MyContainer2<String> c = new MyContainer2<>(); //啟動消費者線程 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++){ System.out.println(c.get()); } }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //啟動生產者線程 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) { c.put(Thread.currentThread().getName() + " " + j); } }, "p" + i).start(); } } }
使用lock和condition好處在於可以精確的通知那些線程被叫醒,哪些線程不必被叫醒,這個效率顯然要比notifyAll把所有線程全叫醒要高很多。
二十一:ThreadLocal
/** * ThreadLocal線程局部變量 */ package yxxy.c_022; import java.util.concurrent.TimeUnit; public class ThreadLocal1 { volatile static Person p = new Person(); public static void main(String[] args) { new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(p.name); }).start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } p.name = "lisi"; }).start(); } } class Person { String name = "zhangsan"; }
/** * ThreadLocal線程局部變量 * * ThreadLocal是使用空間換時間,synchronized是使用時間換空間 * 比如在hibernate中session就存在與ThreadLocal中,避免synchronized的使用 * * 運行下面的程序,理解ThreadLocal */ package yxxy.c_022; import java.util.concurrent.TimeUnit; public class ThreadLocal2 { static ThreadLocal<Person> tl = new ThreadLocal<>(); public static void main(String[] args) { new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(tl.get()); }).start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } tl.set(new Person()); }).start(); } static class Person { String name = "zhangsan"; } }
console輸出:null
二十二:高並發容器
一、需求背景:
有N張火車票,每張票都有一個編號,同時有10個窗口對外售票, 請寫一個模擬程序。
分析下面的程序可能會產生哪些問題?重復銷售?超量銷售?
/** * 有N張火車票,每張票都有一個編號 * 同時有10個窗口對外售票 * 請寫一個模擬程序 * * 分析下面的程序可能會產生哪些問題? * 重復銷售?超量銷售? * * @author 馬士兵 */ package yxxy.c_024; import java.util.ArrayList; import java.util.List; public class TicketSeller1 { static List<String> tickets = new ArrayList<>(); static { for(int i=0; i<10000; i++) tickets.add("票編號:" + i); } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(tickets.size() > 0) { System.out.println("銷售了--" + tickets.remove(0)); } }).start(); } } }
可能賣重;一張票可能對多個線程同時remove(0),所以可能一張票被賣出去多次;也可能最后一張票的時候都被多個線程remove(),程序會報錯,總之,不加鎖是不行的。
ArrayList不是同步的,remove、add等各種方法全都不是同步的;一定會出問題;
二、使用Vector
/** * 使用Vector或者Collections.synchronizedXXX * 分析一下,這樣能解決問題嗎? * * @author 馬士兵 */ package yxxy.c_024; import java.util.Vector; import java.util.concurrent.TimeUnit; public class TicketSeller2 { static Vector<String> tickets = new Vector<>(); static { for(int i=0; i<1000; i++) tickets.add("票 編號:" + i); } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(tickets.size() > 0) { try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("銷售了--" + tickets.remove(0)); } }).start(); } } }
Vector是一個同步容器,所有的方法都是加鎖的;
三、使用synchronized加鎖:
/** * 就算操作A和B都是同步的,但A和B組成的復合操作也未必是同步的,仍然需要自己進行同步 * 就像這個程序,判斷size和進行remove必須是一整個的原子操作 * * @author 馬士兵 */ package yxxy.c_024; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; public class TicketSeller3 { static List<String> tickets = new LinkedList<>(); static { for(int i=0; i<1000; i++) tickets.add("票 編號:" + i); } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(true) { synchronized(tickets) { if(tickets.size() <= 0) break; try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("銷售了--" + tickets.remove(0)); } } }).start(); } } }
四、使用ConcurrentLinkedQueue提供並發性
/** * 使用ConcurrentQueue提高並發性 * * @author 馬士兵 */ package yxxy.c_024; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class TicketSeller4 { static Queue<String> tickets = new ConcurrentLinkedQueue<>(); static { for(int i=0; i<1000; i++) tickets.add("票 編號:" + i); } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(true) { String s = tickets.poll(); if(s == null) { break; }else { System.out.println("銷售了--" + s); } } }).start(); } } }
五、ConcurrentHashMap
復制代碼 /** * http://blog.csdn.net/sunxianghuang/article/details/52221913 * http://www.educity.cn/java/498061.html * 閱讀concurrentskiplistmap */ package yxxy.c_025; import java.util.Arrays; import java.util.Hashtable; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; public class T01_ConcurrentMap { public static void main(String[] args) { // Map<String, String> map = new ConcurrentHashMap<>(); Map<String, String> map = new ConcurrentSkipListMap<>(); //高並發並且排序 // Map<String, String> map = new Hashtable<>(); //Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX //TreeMap Random r = new Random(); Thread[] ths = new Thread[100]; CountDownLatch latch = new CountDownLatch(ths.length); long start = System.currentTimeMillis(); for(int i=0; i<ths.length; i++) { ths[i] = new Thread(()->{ for(int j=0; j<10000; j++) map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000)); latch.countDown(); }); } Arrays.asList(ths).forEach(t->t.start()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(end - start); } }
六:copyOnWriteList
復制代碼 /** * 寫時復制容器 copy on write * 多線程環境下,寫時效率低,讀時效率高 * 適合寫少讀多的環境 * @author 馬士兵 */ package yxxy.c_025; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.Vector; import java.util.concurrent.CopyOnWriteArrayList; public class T02_CopyOnWriteList { public static void main(String[] args) { List<String> lists = //new ArrayList<>(); //這個會出並發問題! //new Vector(); new CopyOnWriteArrayList<>(); Random r = new Random(); Thread[] ths = new Thread[100]; for(int i=0; i<ths.length; i++) { Runnable task = new Runnable() { @Override public void run() { for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000)); } }; ths[i] = new Thread(task); } runAndComputeTime(ths); System.out.println(lists.size()); } static void runAndComputeTime(Thread[] ths) { long s1 = System.currentTimeMillis(); Arrays.asList(ths).forEach(t->t.start()); Arrays.asList(ths).forEach(t->{ try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long s2 = System.currentTimeMillis(); System.out.println(s2 - s1); } }
用於讀少寫多的場景
七、ConcurrentLinkedQueue:
package yxxy.c_025; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class T04_ConcurrentQueue { public static void main(String[] args) { Queue<String> strs = new ConcurrentLinkedQueue<>(); for(int i=0; i<10; i++) { strs.offer("a" + i); //add } System.out.println(strs); System.out.println(strs.size()); System.out.println(strs.poll()); System.out.println(strs.size()); System.out.println(strs.peek()); System.out.println(strs.size()); //雙端隊列Deque } }
[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9] 10 a0 9 a1 9
package yxxy.c_025; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class T05_LinkedBlockingQueue { static BlockingQueue<String> strs = new LinkedBlockingQueue<>(); static Random r = new Random(); public static void main(String[] args) { new Thread(() -> { for (int i = 0; i < 100; i++) { try { strs.put("a" + i); //如果滿了,就會等待 TimeUnit.MILLISECONDS.sleep(r.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } }, "p1").start(); for (int i = 0; i < 5; i++) { new Thread(() -> { for (;;) { try { System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就會等待 } catch (InterruptedException e) { e.printStackTrace(); } } }, "c" + i).start(); } } } 復制代碼
package yxxy.c_025; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class T06_ArrayBlockingQueue { static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); //有界隊列,最多裝10個元素 static Random r = new Random(); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { strs.put("a" + i); } strs.put("aaa"); //滿了就會等待,程序阻塞,無限制的阻塞下去 //strs.add("aaa"); //報異常,Queue full //strs.offer("aaa"); //不會報異常,但是加不進去;boolean帶表是否加成功;這是add和offer的區別 //strs.offer("aaa", 1, TimeUnit.SECONDS); //1s鍾之后加不進去就加不進了;按時間段阻塞 System.out.println(strs); } } 復制代碼
九:DelayQueue
package yxxy.c_025; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class T07_DelayQueue { static BlockingQueue<MyTask> tasks = new DelayQueue<>(); static Random r = new Random(); static class MyTask implements Delayed { long runningTime; MyTask(long rt) { this.runningTime = rt; } @Override public int compareTo(Delayed o) { if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) return -1; else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1; else return 0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public String toString() { return "" + runningTime; } } public static void main(String[] args) throws InterruptedException { long now = System.currentTimeMillis(); MyTask t1 = new MyTask(now + 1000); MyTask t2 = new MyTask(now + 2000); MyTask t3 = new MyTask(now + 1500); MyTask t4 = new MyTask(now + 2500); MyTask t5 = new MyTask(now + 500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int i=0; i<5; i++) { System.out.println(tasks.take()); } } }
console
[1534606492700, 1534606493200, 1534606493700, 1534606494700, 1534606494200]
1534606492700
1534606493200
1534606493700
1534606494200
1534606494700
可以用來執行定時任務;
package yxxy.c_025; import java.util.concurrent.LinkedTransferQueue; public class T08_TransferQueue { public static void main(String[] args) throws InterruptedException { LinkedTransferQueue<String> strs = new LinkedTransferQueue<>(); new Thread(() -> { try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); strs.transfer("aaa"); } } 復制代碼
2.如果先起生產者transfer,然后再起消費者take,程序就會阻塞住了:
package yxxy.c_025; import java.util.concurrent.LinkedTransferQueue; public class T08_TransferQueue { public static void main(String[] args) throws InterruptedException { LinkedTransferQueue<String> strs = new LinkedTransferQueue<>(); strs.transfer("aaa"); new Thread(() -> { try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
3.如果transfer換成put(或者add、offer),也不會有問題,因為不會阻塞:
package yxxy.c_025; import java.util.concurrent.LinkedTransferQueue; public class T08_TransferQueue { public static void main(String[] args) throws InterruptedException { LinkedTransferQueue<String> strs = new LinkedTransferQueue<>(); //strs.transfer("aaa"); strs.put("aaa"); new Thread(() -> { try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
十一、SynchronousQueue
package yxxy.c_025; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; public class T09_SynchronusQueue { //容量為0 public static void main(String[] args) throws InterruptedException { BlockingQueue<String> strs = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); strs.put("aaa"); //阻塞等待消費者消費 //strs.add("aaa"); System.out.println(strs.size()); } }
總結:
1:對於map/set的選擇使用
HashMap 不需要多線程的情況下使用
TreeMap 不需要多線程的情況下使用
LinkedHashMap 不需要多線程的情況下使用
Hashtable 並發量比較小
Collections.sychronizedXXX 並發量比較小
ConcurrentHashMap 高並發
ConcurrentSkipListMap 高並發同時要求排好順序
2:隊列
ArrayList 不需要同步的情況
LinkedList 不需要同步的情況
Collections.synchronizedXXX 並發量低
Vector 並發量低
CopyOnWriteList 寫的時候少,讀時候多
Queue
CocurrentLinkedQueue //concurrentArrayQueue 高並發隊列
BlockingQueue 阻塞式
LinkedBQ 無界
ArrayBQ 有界
TransferQueue 直接給消費者線程,如果沒有消費者阻塞
SynchronusQueue 特殊的transferQueue,容量0
DelayQueue執行定時任務
二十三:高並發線程池
一、認識Executor、ExecutorService、Callable、Executors
/** * 認識Executor */ package yxxy.c_026; import java.util.concurrent.Executor; public class T01_MyExecutor implements Executor { public static void main(String[] args) { new T01_MyExecutor().execute(new Runnable(){ @Override public void run() { System.out.println("hello executor"); } }); } @Override public void execute(Runnable command) { //new Thread(command).run(); command.run(); } }
二、ThreadPool:
/** * 線程池的概念 */ package yxxy.c_026; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T05_ThreadPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(5); //execute submit for (int i = 0; i < 6; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); service.shutdown(); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); TimeUnit.SECONDS.sleep(5); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); } }
三、Future
/** * 認識future */ package yxxy.c_026; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; public class T06_Future { public static void main(String[] args) throws InterruptedException, ExecutionException { /*FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>(){ @Override public Integer call() throws Exception { TimeUnit.MILLISECONDS.sleep(3000); return 1000; } });*/ FutureTask<Integer> task = new FutureTask<>(()->{ TimeUnit.MILLISECONDS.sleep(3000); return 1000; }); new Thread(task).start(); System.out.println(task.get()); //阻塞 //******************************* ExecutorService service = Executors.newFixedThreadPool(5); Future<Integer> f = service.submit(()->{ TimeUnit.MILLISECONDS.sleep(5000); return 1; }); System.out.println(f.isDone()); System.out.println(f.get()); System.out.println(f.isDone()); } }
console
1000 false 1 true
Future: ExecutorService里面有submit方法,它的返回值是Future類型,因為你扔一個任務進去需要執行一段時間,未來的某一個時間點上,任務執行完了產生給你一個結果,這個Future代表的就是那個Callable的返回值;
四、並行計算的例子:
/** * 線程池的概念 * nasa */ package yxxy.c_026; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class T07_ParallelComputing { public static void main(String[] args) throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); List<Integer> results = getPrime(1, 200000); long end = System.currentTimeMillis(); System.out.println(end - start); final int cpuCoreNum = 4; ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum); MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20(越大的數計算是不是素數的時間越長) MyTask t2 = new MyTask(80001, 130000); MyTask t3 = new MyTask(130001, 170000); MyTask t4 = new MyTask(170001, 200000); Future<List<Integer>> f1 = service.submit(t1); Future<List<Integer>> f2 = service.submit(t2); Future<List<Integer>> f3 = service.submit(t3); Future<List<Integer>> f4 = service.submit(t4); start = System.currentTimeMillis(); f1.get(); f2.get(); f3.get(); f4.get(); end = System.currentTimeMillis(); System.out.println(end - start); } static class MyTask implements Callable<List<Integer>> { int startPos, endPos; MyTask(int s, int e) { this.startPos = s; this.endPos = e; } @Override public List<Integer> call() throws Exception { List<Integer> r = getPrime(startPos, endPos); return r; } } //判斷是否是質數 static boolean isPrime(int num) { for(int i=2; i<=num/2; i++) { if(num % i == 0) return false; } return true; } static List<Integer> getPrime(int start, int end) { List<Integer> results = new ArrayList<>(); for(int i=start; i<=end; i++) { if(isPrime(i)) results.add(i); } return results; } } 復制代碼
console:
2280 818
五、CachedThreadPool
package yxxy.c_026; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T08_CachedPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for (int i = 0; i < 2; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); TimeUnit.SECONDS.sleep(80); //cachedthreadPool里面的線程空閑狀態默認60s后銷毀,這里保險起見 System.out.println(service); } }
console
java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0] pool-1-thread-2 pool-1-thread-1 java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
六、SingleThreadExecutor
package yxxy.c_026; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class T09_SingleThreadPool { public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); for(int i=0; i<5; i++) { final int j = i; service.execute(()->{ System.out.println(j + " " + Thread.currentThread().getName()); }); } } }
0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1
七、ScheduledThreadPool
package yxxy.c_026; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class T10_ScheduledPool { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(4); service.scheduleAtFixedRate(()->{ try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }, 0, 500, TimeUnit.MILLISECONDS); } }
用於定時重復執行 某個任務
八、WorkStealingPool(工作竊取線程池,為精靈線程)
/** * */ package yxxy.c_026; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T11_WorkStealingPool { public static void main(String[] args) throws IOException { ExecutorService service = Executors.newWorkStealingPool(); int count = Runtime.getRuntime().availableProcessors(); //看cpu多少核的;如果是4核,默認就幫你起4個線程 System.out.println(count); service.execute(new R(1000)); for(int i=0; i<count; i++){ service.execute(new R(2000)); } //由於產生的是精靈線程(守護線程、后台線程),主線程不阻塞的話,看不到輸出 System.in.read(); } static class R implements Runnable { int time; R(int t) { this.time = t; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time + " " + Thread.currentThread().getName()); } } }
console
1000 ForkJoinPool-1-worker-1 ForkJoinPool-1-worker-2 ForkJoinPool-1-worker-0 ForkJoinPool-1-worker-5 ForkJoinPool-1-worker-3 ForkJoinPool-1-worker-6 ForkJoinPool-1-worker-7 ForkJoinPool-1-worker-4 ForkJoinPool-1-worker-1
工作竊取線程池:本來執行完自己的線程應該變為等待狀態,但是這個會去別的線程里面拿任務執行
workStealing用於什么場景:就說任務分配的不是很均勻,有的線程維護的任務隊列比較長,有些線程執行完任務就結束了不太合適,所以他執行完了之后可以去別的線程維護的隊列里去偷任務,這樣效率更高。
九、ForkJoinPool(有點類似歸並)
ForkJoinPool: forkjoin的意思就是如果有一個難以完成的大任務,需要計算量特別大,時間特別長,可以把大任務切分成一個個小任務,如果小任務還是太大,它還可以繼續分,至於分成多少你可以自己指定,... 分完之后,把結果進行合並,最后合並到一起join一起,產生一個總的結果。而里面任務的切分你可以自己指定,線程的啟動根據你任務切分的規則,由ForkJoinPool這個線程池自己來維護。
package yxxy.c_026; import java.io.IOException; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; public class T12_ForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); } System.out.println(Arrays.stream(nums).sum()); //stream api } static class AddTask extends RecursiveAction { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected void compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; System.out.println("from:" + start + " to:" + end + " = " + sum); } else { int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); } } } public static void main(String[] args) throws IOException { ForkJoinPool fjp = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); fjp.execute(task); System.in.read(); } } 復制代碼
49494882 from:906250 to:937500 = 1545274 from:968750 to:1000000 = 1537201 from:593750 to:625000 = 1548289 from:718750 to:750000 = 1546396 from:468750 to:500000 = 1550373 from:843750 to:875000 = 1543421 from:218750 to:250000 = 1549856 from:93750 to:125000 = 1548384 from:562500 to:593750 = 1541814 from:812500 to:843750 = 1547885 from:187500 to:218750 = 1546831 from:687500 to:718750 = 1554064 from:437500 to:468750 = 1547434 from:937500 to:968750 = 1547676 from:875000 to:906250 = 1551839 from:62500 to:93750 = 1548576 from:531250 to:562500 = 1550943 from:656250 to:687500 = 1544991 from:156250 to:187500 = 1548367 from:406250 to:437500 = 1539881 from:125000 to:156250 = 1548128 from:500000 to:531250 = 1545229 from:781250 to:812500 = 1544296 from:625000 to:656250 = 1545283 from:375000 to:406250 = 1553931 from:31250 to:62500 = 1544024 from:750000 to:781250 = 1543573 from:343750 to:375000 = 1546407 from:0 to:31250 = 1539743 from:281250 to:312500 = 1549470 from:312500 to:343750 = 1552190 from:250000 to:281250 = 1543113
package yxxy.c_026; import java.io.IOException; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; public class T12_ForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); } System.out.println(Arrays.stream(nums).sum()); //stream api } static class AddTask extends RecursiveTask<Long> { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected Long compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; return sum; } int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); return subTask1.join() + subTask2.join(); } } public static void main(String[] args) throws IOException { ForkJoinPool fjp = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); fjp.execute(task); long result = task.join(); System.out.println(result); } }
49498457 49498457