線程
首先了解線程的五大狀態:新建,就緒,運行,阻塞,終結。
1、新建狀態(New):新創建了一個線程對象。
2、就緒狀態(Runnable):線程對象創建后,其他線程調用了該對象的start()方法。該狀態的線程位於“可運行線程池”中,變得可運行,只等待獲取CPU的使用權。即在就緒狀態的進程除CPU之外,其它的運行所需資源都已全部獲得。
3、運行狀態(Running):就緒狀態的線程獲取了CPU,執行程序代碼。
4、阻塞狀態(Blocked):阻塞狀態是線程因為某種原因放棄CPU使用權,暫時停止運行。直到線程進入就緒狀態,才有機會轉到運行狀態。
阻塞的情況分三種:
(1)、等待阻塞:運行的線程執行wait()方法,該線程會釋放占用的所有資源,JVM會把該線程放入“等待池”中。進入這個狀態后,是不能自動喚醒的,必須依靠其他線程調用notify()或notifyAll()方法才能被喚醒,
(2)、同步阻塞:運行的線程在獲取對象的同步鎖時,若該同步鎖被別的線程占用,則JVM會把該線程放入“鎖池”中。
(3)、其他阻塞:運行的線程執行sleep()或join()方法,或者發出了I/O請求時,JVM會把該線程置為阻塞狀態。當sleep()狀態超時、join()等待線程終止或者超時、或者I/O處理完畢時,線程重新轉入就緒狀態。
5、死亡狀態(Dead):線程執行完了或者因異常退出了run()方法,該線程結束生命周期。
線程變化的狀態轉換圖如下:
線程的實現方法
實現有兩種方式,一是繼承Thread類,二是實現Runnable接口,但不管怎樣, 當我們new了這個對象后,線程就進入了初始狀態; 當該對象調用了start()方法,就進入就緒狀態!
在工作中,因為需要獲取返回值的線程,接觸到了callable接口
Callable需要實現的是call()方法,而不是run()方法,返回值的類型有Callable的類型參數指定, Callable只能由ExecutorService.submit() 執行,正常結束后將返回一個future對象,實例如下:
ExecutorService pool = Executors.newFixedThreadPool(10); Callable<String> c1 = new YkuploadRunable("); Future<String> f1 = pool.submit(c1); System.out.println(f1.get()); pool.shutdown();
線程的經典問題:生產者和消費者問題
生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品,要有保護倉庫為空或溢出
解決生產者/消費者問題的方法可分為兩類:
1.采用某種機制保護生產者和消費者之間的同步;
2.在生產者和消費者之間建立一個管道。
實現方法有
1.object.wait(),object.notify();
2.condition.await(),condition.signal();
3.LinkedBlockingQueue<Object> list; put(),take()會自動阻塞
方法一的具體代碼
倉庫.java
public class Storage { // 倉庫最大存儲量 private final int MAX_SIZE = 100; // 倉庫存儲的載體 private LinkedList<Object> list = new LinkedList<Object>(); // 生產產品 public void produce(String producer) { synchronized (list) { // 如果倉庫已滿 while (list.size() == MAX_SIZE) { System.out.println("倉庫已滿,【"+producer+"】: 暫時不能執行生產任務!"); try { // 由於條件不滿足,生產阻塞 list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生產產品 list.add(new Object()); System.out.println("【"+producer+"】:生產了一個產品\t【現倉儲量為】:" + list.size()); list.notifyAll(); } } // 消費產品 public void consume(String consumer) { synchronized (list) { //如果倉庫存儲量不足 while (list.size()==0) { System.out.println("倉庫已空,【"+consumer+"】: 暫時不能執行消費任務!"); try { // 由於條件不滿足,消費阻塞 list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.remove(); System.out.println("【"+consumer+"】:消費了一個產品\t【現倉儲量為】:" + list.size()); list.notifyAll(); } } public LinkedList<Object> getList() { return list; } public void setList(LinkedList<Object> list) { this.list = list; } public int getMAX_SIZE() { return MAX_SIZE; } }
生產者.java
public class Producer extends Thread { private String producer; private Storage storage; public Producer(Storage storage) { this.storage = storage; } @Override public void run() { produce(producer); } public void produce(String producer) { storage.produce(producer); } public String getProducer() { return producer; } public void setProducer(String producer) { this.producer = producer; } public Storage getStorage() { return storage; } public void setStorage(Storage storage) { this.storage = storage; } }
消費者.java
public class Consumer extends Thread { private String consumer; private Storage storage; public Consumer(Storage storage) { this.storage = storage; } @Override public void run() { consume(consumer); } public void consume(String consumer) { storage.consume(consumer); } public Storage getStorage() { return storage; } public void setStorage(Storage storage) { this.storage = storage; } public String getConsumer() { return consumer; } public void setConsumer(String consumer) { this.consumer = consumer; } }
方法二的實現如下:注意lock的lock(),unlock()
public class Storage { // 倉庫最大存儲量 private final int MAX_SIZE = 100; // 倉庫存儲的載體 private LinkedList<Object> list = new LinkedList<Object>(); // 鎖 private final Lock lock = new ReentrantLock(); // 倉庫滿的條件變量 private final Condition full = lock.newCondition(); // 倉庫空的條件變量 private final Condition empty = lock.newCondition(); // 生產產品 public void produce(String producer) { lock.lock(); // 如果倉庫已滿 while (list.size() == MAX_SIZE) { System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!"); try { // 由於條件不滿足,生產阻塞 full.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生產產品 list.add(new Object()); System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size()); empty.signalAll(); // 釋放鎖 lock.unlock(); } // 消費產品 public void consume(String consumer) { // 獲得鎖 lock.lock(); // 如果倉庫存儲量不足 while (list.size() == 0) { System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!"); try { // 由於條件不滿足,消費阻塞 empty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.remove(); System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size()); full.signalAll(); // 釋放鎖 lock.unlock(); } public LinkedList<Object> getList() { return list; } public void setList(LinkedList<Object> list) { this.list = list; } public int getMAX_SIZE() { return MAX_SIZE; } }
方法三的實現如下:
import java.util.concurrent.LinkedBlockingQueue; public class Storage { // 倉庫最大存儲量 private final int MAX_SIZE = 100; // 倉庫存儲的載體 private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(100); // 生產產品 public void produce(String producer) { // 如果倉庫已滿 if (list.size() == MAX_SIZE) { System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!"); } // 生產產品 try { list.put(new Object()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size()); } // 消費產品 public void consume(String consumer) { // 如果倉庫存儲量不足 if (list.size() == 0) { System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!"); } try { list.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size()); } public LinkedBlockingQueue<Object> getList() { return list; } public void setList(LinkedBlockingQueue<Object> list) { this.list = list; } public int getMAX_SIZE() { return MAX_SIZE; } }
線程同步
同步鎖synchronized
上面的wait(),await()
還有一種join方法,使線程順序進行。
線程實戰
因為需要在controller中同時上傳多個視頻並保存返回的視頻id進數據庫,所以在controller中寫了一個內部類,具體代碼如下:
class Ykupload implements Runnable { @Override public void run() { System.out.println(tpVideoLibrary.toString()); YkuploadRunable yp = new YkuploadRunable(); //獲取yp的數據並寫進數據庫 } }
后記:貌似不能同時上傳同一個文件,多個視頻上傳速度還是取決於你的帶寬!.
線程池
線程池,顧名思義存放線程的池子,可以類比數據庫的連接池。因為頻繁地創建和銷毀線程會給服務器帶來很大的壓力。若能將創建的線程不再銷毀而是存放在池中等待下一個任務使用,可以不僅減少了創建和銷毀線程所用的時間,提高了性能,同時還減輕了服務器的壓力。
初始化線程池有五個核心參數,分別是 corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue。還有兩個默認參數 threadFactory, handler
corePoolSize:線程池初始核心線程數。初始化線程池的時候,池內是沒有線程,只有在執行任務的時會創建線程。
maximumPoolSize:線程池允許存在的最大線程數。若超過該數字,默認提示RejectedExecutionException異常
keepAliveTime:當前線程數大於核心線程時,該參數生效,其目的是終止多余的空閑線程等待新任務的最長時間。即指定時間內將還未接收任務的線程銷毀。
unit:keepAliveTime 的時間單位
workQueue:緩存任務的的隊列,一般采用LinkedBlockingQueue。
threadFactory:執行程序創建新線程時使用的工廠,一般采用默認值。
handler:超出線程范圍和隊列容量而使執行被阻塞時所使用的處理程序,一般采用默認值。
在接收任務前,線程池內是沒有線程。只有當任務來了才開始新建線程。當任務數大於核心線程數時,任務進入隊列中等待。若隊列滿了,則線程池新增線程直到最大線程數。再超過則會執行拒絕策略。
舉個栗子:
/** 若有Thread1、Thread2、Thread3、Thread4四條線程分別統計C、D、E、F四個盤的大小,所有線程都統計完畢交給Thread5線程去做匯總,應當如何實現 **/ import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ITDragonThreads { public static void main(String[] args) throws Exception { // 無緩沖無界線程池 ExecutorService executor = Executors.newFixedThreadPool(8); /** * CompletionService與ExecutorService最主要的區別在於 *前者submit的task不一定是按照加入時的順序完成的。CompletionService對ExecutorService進行了包裝, *內部維護一個保存Future對象的BlockingQueue。 *只有當這個Future對象狀態是結束的時候,才會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。 *它會從Queue中取出Future對象,如果Queue是空的,就會阻塞在那里,直到有 完成的Future對象加入到Queue中。 *所以,先完成的必定先被取出。這樣就減少了不必要的等待時間。 **/ CompletionService<Long> completion = new ExecutorCompletionService<Long>(executor); CountWorker countWorker = null; for (int i = 0; i < 4; i++) { // 四個線程負責統計 countWorker = new CountWorker(i+1); completion.submit(countWorker); } // 關閉線程池 //shutdown()阻止新來的任務提交,對已經提交了的任務不會產生任何影響。當已經提交的任務執行完后,它會將那些閑置的線程(idleWorks)進行中斷,這個過程是異步的。 executor.shutdown(); // 主線程相當於第五個線程,用於匯總數據 long total = 0; for (int i = 0; i < 4; i++) { total += completion.take().get(); } System.out.println(total / 1024 / 1024 / 1024 +"G"); } } class CountWorker implements Callable<Long>{ private Integer type; public CountWorker() { } public CountWorker(Integer type) { this.type = type; } @Override public Long call() throws Exception { ArrayList<String> paths = new ArrayList<>(Arrays.asList("c:", "d:", "e:", "f:")); return countDiskSpace(paths.get(type - 1)); } // 統計磁盤大小 private Long countDiskSpace (String path) { File file = new File(path); long totalSpace = file.getTotalSpace(); System.out.println(path + " 總空間大小 : " + totalSpace / 1024 / 1024 / 1024 + "G"); return totalSpace; } }