1、並發編程
並發通常能提高單處理器的程序性能。可是,看到這句話有些違背直覺。多線程的運行增加了線程間切換的開銷,僅僅從這個角度看,單線程運行總比多線程的性能好。但是,程序的阻塞會使得結果不一樣,當某個線程阻塞時候,其它線程仍然可以執行,因此程序仍保持運行。充分利用cpu的時間提高的性能遠大於線程間的切換帶來的性能下降。
何為阻塞:程序中某個線程由於在不滿足某些條件的情況下而導致不能夠繼續執行的現象
2、基本線程機制:
一個程序可以分為多個獨立運行任務,每個獨立任務由線程驅動執行。線程並發執行,表面看起來是同時執行,好像各自都有一個自己的CPU一樣。實際上,底層機制是CPU將時間片分給各個線程,一個時間只能有一個線程獲得CPU的時間片,也就是線程獨占CPU資源。
3、定義任務類、定義線程類
- 定義任務
package com.duoxiancheng; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { for (int i=0; i<2; i++) { new Thread(new LiftOff()).start(); } } } class LiftOff implements Runnable{ private int countDown=3; private static int taskCount=0; private final int id=taskCount++; @Override public void run() { while (countDown-->0) { System.out.println("#id=" + id + " countDown="+(countDown > 0 ? countDown : "LiftOff!")); Thread.yield(); } } }
輸出結果
#id=0 countDown=2 #id=1 countDown=2 #id=0 countDown=1 #id=1 countDown=1 #id=0 countDown=LiftOff! #id=1 countDown=LiftOff!
- 定義線程類
package com.duoxiancheng; public class Main { public static void main(String[] args) { for (int i=0; i<2; i++) { new LiftOff().start(); } } } class LiftOff extends Thread{ private int countDown=3; private static int taskCount=0; private final int id=taskCount++; @Override public void run() { while (countDown-->0) { System.out.println("#id=" + id + " countDown="+(countDown > 0 ? countDown : "LiftOff!")); Thread.yield(); } } }
4、線程池
待續...
5、從任務中產生返回值---Callable<T>接口
package com.duoxiancheng; import java.util.concurrent.*; public class Main1 { public static void main(String[] args) throws Exception{ ExecutorService executorService= Executors.newCachedThreadPool(); for (int i=0;i<50;i++){ Future future = executorService.submit(new TaskWithResult(i)); Object result = future.get(); System.out.println(result); } } } class TaskWithResult implements Callable{ private int id=0; public TaskWithResult(int id){ this.id=id; } @Override public Object call() throws Exception { return id; } }
其中 future.get()是阻塞的方法;如果想想立即阻塞任務的等待,則可以使用 result = exec.submit(aCallable).get(); 形式
6、常用方法
休眠---sleep()
讓步---yield()
加入一個線程---join()
優先級--setPriority()/getPriority()
后台線程--setDaemon()/isDaemon()
...
7、捕獲線程中的異常
線程中拋出異常,會傳播到控制台,除非采用特殊手段。
public interface Runnable { public abstract void run(); }
- 在run()方法內部try-catch-finally捕獲異常
- 使用異常處理器捕獲異常--異常處理器實現Thread.UncaughtExceptionHandler接口
以下分析自定義異常處理器:
為線程設置異常處理器。具體做法可以是以下幾種:
(1)Thread.setUncaughtExceptionHandler設置當前線程的異常處理器;
(2)Thread.setDefaultUncaughtExceptionHandler為整個程序設置默認的異常處理器;
如果當前線程有異常處理器(默認沒有),則優先使用該UncaughtExceptionHandler類;否則,如果當前線程所屬的線程組有異常處理器,則使用線程組的
UncaughtExceptionHandler;否則,使用全局默認的DefaultUncaughtExceptionHandler;如果都沒有的話,子線程就會退出
package com.duoxiancheng; import java.util.concurrent.ThreadFactory; public class Main2 { public static void main(String[] args) { Thread thread=new Thread(new ExceptionThread()); thread.setUncaughtExceptionHandler(new MyExceptionHandler()); thread.start(); } } /** * 任務類 */ class ExceptionThread implements Runnable{ @Override public void run() { Thread t = Thread.currentThread(); System.out.println("ExceptionThread 當前線程信息:"+t.toString()); System.out.println("當前線程ExceptionThread的異常處理器" +t.getUncaughtExceptionHandler()); throw new RuntimeException(); } } /** * 線程異常處理器 */ class MyExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { System.out.println("拋出的異常是:"+e); } }
8、共享資源
- 共享資源競爭:
導致線程安全問題
- 解決思想:
多人(線程)都希望單獨使用浴室(共享資源)。為了使用浴室,一個人先敲門,看能不能使用。如果沒人回話,他就進入浴室並鎖上門(獲得鎖)。這時候,其它人想使用浴室的話,就會被阻擋在外面(不能獲取鎖),直到浴室可以使用。浴室外面的人沒有排隊,浴室門打開(前一個人釋放鎖),離門最近的人優先進入使用(獲得鎖,設置優先級和yield方法可以建議某個優先使用)。
- 解決方式:
Synchronized 、Lock鎖同步以及Voliate修飾符和原子類
線程本地存儲---ThreadLocal
9、線程之間協作
- 生產者與消費者
package com.duoxiancheng; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 生產者-消費者 */ public class Main3 { public static void main(String[ ] args) throws InterruptedException { Restaurant restaurant=new Restaurant(); new Thread(new ProductorThread(restaurant)).start(); Thread.sleep(20); new Thread(new ConsumerThread(restaurant)).start(); } } class Restaurant { Lock lock=new ReentrantLock();//鎖 Condition condition1=lock.newCondition();//條件1 Condition condition2=lock.newCondition();//條件2 private int count;//已做好的餐 private int count2; /** * 消費者方法 */ public void comsumer(){ lock.lock(); try { if (count==0) { System.out.println(Thread.currentThread().getId()+"客戶 想要一份快餐!"); condition2.signalAll(); System.out.println(Thread.currentThread().getId()+"客戶 等待一份快餐!"); condition1.await(); } count--; System.out.println(Thread.currentThread().getId()+ "客戶 消費了一份快餐!"); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } /** * 生產者方法 */ public void productor(){ lock.lock(); try { condition2.await(); count++;//生產一份快餐 System.out.println(Thread.currentThread().getId()+ "廚師 制作了一份快餐!"); condition1.signalAll(); System.out.println(Thread.currentThread().getId()+"廚師 通知客戶使用"); }catch (InterruptedException e){ e.printStackTrace(); }finally { lock.unlock(); } } } /** * 消費者 */ class ConsumerThread implements Runnable{ private Restaurant restaurant; public ConsumerThread(Restaurant restaurant){ this.restaurant=restaurant; } @Override public void run() { restaurant.comsumer(); } } /** * 生產者 */ class ProductorThread implements Runnable{ private Restaurant restaurant; public ProductorThread(Restaurant restaurant){ this.restaurant=restaurant; } @Override public void run() { restaurant.productor(); } }
輸出結果:
11客戶 想要一份快餐!
11客戶 等待一份快餐!
10廚師 制作了一份快餐!
10廚師 通知客戶使用
11客戶 消費了一份快餐!
- 生產者與消費者 和 隊列
使用wait()、notifyAll() 是一種解決任務互操作問題非常低級的方式。使用同步隊列來解決任務協作問題,同步隊列在任何時刻只允許一個任務插入或移除。
java.util.concurrent.BlockingQueue接口提供了這個同步隊列,其有大量的實現。通常可以使用LinkedBlockingDeque(無界隊列) 和 ArrayBlockingDeque(固定尺寸隊列)。
消費者任務試圖從隊列中獲取對象,而該隊列此時為空,那這些隊列還可以掛起消費者任務(阻塞);當有更多元素可用時恢復消費者任務。阻塞隊列可以解決非常大量的問題。
package com.duoxiancheng; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; /** * 生產者-消費者與隊列 */ public class Main4 { public static void main(String[] args) { ExecutorService executorService= Executors.newCachedThreadPool(); LinkedBlockingDeque<Toast> toastQueue=new LinkedBlockingDeque<>(); LinkedBlockingDeque<Toast> butteredQueue=new LinkedBlockingDeque<>(); LinkedBlockingDeque<Toast> jammedQueue=new LinkedBlockingDeque<>(); executorService.execute(new Toaster(toastQueue)); executorService.execute(new Butterer(toastQueue, butteredQueue)); executorService.execute(new Jammed(butteredQueue, jammedQueue)); executorService.execute(new Eater(jammedQueue)); } } class Toast{ private Status status=Status.DRY; private final int id; public Toast(int id) { this.id = id; } public int getId() { return id; } public Status getStatus() { return status; } public void addButtered(){ status=Status.BUTTERED; } public void addJammed(){ status=Status.JAMMED; } @Override public String toString() { return "Toast "+id+" : "+status; } /** * 枚舉類型 */ public enum Status{ DRY,BUTTERED,JAMMED } } /** * 制作吐司 */ class Toaster implements Runnable{ private LinkedBlockingDeque<Toast> toastQueue; private int count; public Toaster(LinkedBlockingDeque toastQueue){ this.toastQueue=toastQueue; } @Override public void run() { try { for (int i = 0; i < 5; i++) { Toast toast = new Toast(count++); System.out.println(toast); toastQueue.put(toast); Thread.sleep(100); } } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 添加黃油 */ class Butterer implements Runnable{ private LinkedBlockingDeque<Toast> toastQueue; private LinkedBlockingDeque<Toast> butteredQueue; public Butterer(LinkedBlockingDeque toastQueue,LinkedBlockingDeque butteredQueue){ this.toastQueue=toastQueue; this.butteredQueue=butteredQueue; } @Override public void run() { try { for (int i = 0; i < 5; i++) { Toast toast = toastQueue.take(); toast.addButtered(); System.out.println("添加黃油, " + toast); butteredQueue.put(toast); } }catch (InterruptedException e){ e.printStackTrace(); } } } /** * 添加果醬 */ class Jammed implements Runnable{ private LinkedBlockingDeque<Toast> butteredQueue; private LinkedBlockingDeque<Toast> jammedQueue; public Jammed(LinkedBlockingDeque butteredQueue, LinkedBlockingDeque jammedQueue){ this.butteredQueue=butteredQueue; this.jammedQueue=jammedQueue; } @Override public void run() { try { for (int i = 0; i < 5; i++) { Toast toast = butteredQueue.take(); toast.addJammed(); System.out.println("添加果醬, " + toast); jammedQueue.put(toast); } }catch (InterruptedException e){ e.printStackTrace(); } } } /** * 消費吐司 */ class Eater implements Runnable{ private LinkedBlockingDeque<Toast> jammedQueue; public Eater(LinkedBlockingDeque jammedQueue){ this.jammedQueue=jammedQueue; } private int counter; @Override public void run() { try { for (int i = 0; i < 5; i++) { Toast toast = jammedQueue.take(); if (toast.getStatus() != Toast.Status.JAMMED) { System.out.println("=====ERROR====="); } else { System.out.println("消費了一個吐司"); } } } catch (InterruptedException e) { e.printStackTrace(); } } }
輸出結果:
Toast 0 : DRY 添加黃油, Toast 0 : BUTTERED 添加果醬, Toast 0 : JAMMED 消費了一個吐司 Toast 1 : DRY 添加黃油, Toast 1 : BUTTERED 添加果醬, Toast 1 : JAMMED 消費了一個吐司 Toast 2 : DRY 添加黃油, Toast 2 : BUTTERED 添加果醬, Toast 2 : JAMMED 消費了一個吐司 Toast 3 : DRY 添加黃油, Toast 3 : BUTTERED 添加果醬, Toast 3 : JAMMED 消費了一個吐司 Toast 4 : DRY 添加黃油, Toast 4 : BUTTERED 添加果醬, Toast 4 : JAMMED 消費了一個吐司
10、死鎖
某個線程在等待另一個線程釋放鎖,而后者又在等別的線程的所,這樣一直下去,直到整個鏈條上的線程又在等第一個線程釋放鎖。這樣線程之間的相互等待的連續循環,導致沒有哪個線程能繼續執行的現象。
- 死鎖四大條件:
請求與保持
不可剝奪
互斥條件
循環等待
- 防止死鎖:
發生死鎖必須全部滿足四大條件,要避免死鎖,只需破壞其中一個即可。