線程
首先了解線程的五大狀態:新建,就緒,運行,阻塞,終結。
1、新建狀態(New):新創建了一個線程對象。
2、就緒狀態(Runnable):線程對象創建后,其他線程調用了該對象的start()方法。該狀態的線程位於“可運行線程池”中,變得可運行,只等待獲取CPU的使用權。即在就緒狀態的進程除CPU之外,其它的運行所需資源都已全部獲得。
3、運行狀態(Running):就緒狀態的線程獲取了CPU,執行程序代碼。
4、阻塞狀態(Blocked):阻塞狀態是線程因為某種原因放棄CPU使用權,暫時停止運行。直到線程進入就緒狀態,才有機會轉到運行狀態。
阻塞的情況分三種:
(1)、等待阻塞:運行的線程執行wait()方法,該線程會釋放占用的所有資源,JVM會把該線程放入“等待池”中。進入這個狀態后,是不能自動喚醒的,必須依靠其他線程調用notify()或notifyAll()方法才能被喚醒,
(2)、同步阻塞:運行的線程在獲取對象的同步鎖時,若該同步鎖被別的線程占用,則JVM會把該線程放入“鎖池”中。
(3)、其他阻塞:運行的線程執行sleep()或join()方法,或者發出了I/O請求時,JVM會把該線程置為阻塞狀態。當sleep()狀態超時、join()等待線程終止或者超時、或者I/O處理完畢時,線程重新轉入就緒狀態。
5、死亡狀態(Dead):線程執行完了或者因異常退出了run()方法,該線程結束生命周期。
線程變化的狀態轉換圖如下:
線程的實現方法
實現有兩種方式,一是繼承Thread類,二是實現Runnable接口,但不管怎樣, 當我們new了這個對象后,線程就進入了初始狀態; 當該對象調用了start()方法,就進入就緒狀態!
在工作中,因為需要獲取返回值的線程,接觸到了callable接口
Callable需要實現的是call()方法,而不是run()方法,返回值的類型有Callable的類型參數指定, Callable只能由ExecutorService.submit() 執行,正常結束后將返回一個future對象,實例如下:
ExecutorService pool = Executors.newFixedThreadPool(10);
Callable<String> c1 = new YkuploadRunable(");
Future<String> f1 = pool.submit(c1);
System.out.println(f1.get());
pool.shutdown();
線程的經典問題:生產者和消費者問題
生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品,要有保護倉庫為空或溢出
解決生產者/消費者問題的方法可分為兩類:
1.采用某種機制保護生產者和消費者之間的同步;
2.在生產者和消費者之間建立一個管道。
實現方法有
1.object.wait(),object.notify();
2.condition.await(),condition.signal();
3.LinkedBlockingQueue<Object> list; put(),take()會自動阻塞
方法一的具體代碼
倉庫.java
public class Storage
{
// 倉庫最大存儲量
private final int MAX_SIZE = 100;
// 倉庫存儲的載體
private LinkedList<Object> list = new LinkedList<Object>();
// 生產產品
public void produce(String producer)
{
synchronized (list)
{
// 如果倉庫已滿
while (list.size() == MAX_SIZE)
{
System.out.println("倉庫已滿,【"+producer+"】: 暫時不能執行生產任務!");
try
{
// 由於條件不滿足,生產阻塞
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// 生產產品
list.add(new Object());
System.out.println("【"+producer+"】:生產了一個產品\t【現倉儲量為】:" + list.size());
list.notifyAll();
}
}
// 消費產品
public void consume(String consumer)
{
synchronized (list)
{
//如果倉庫存儲量不足
while (list.size()==0)
{
System.out.println("倉庫已空,【"+consumer+"】: 暫時不能執行消費任務!");
try
{
// 由於條件不滿足,消費阻塞
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
list.remove();
System.out.println("【"+consumer+"】:消費了一個產品\t【現倉儲量為】:" + list.size());
list.notifyAll();
}
}
public LinkedList<Object> getList()
{
return list;
}
public void setList(LinkedList<Object> list)
{
this.list = list;
}
public int getMAX_SIZE()
{
return MAX_SIZE;
}
}
生產者.java
public class Producer extends Thread
{
private String producer;
private Storage storage;
public Producer(Storage storage)
{
this.storage = storage;
}
@Override
public void run()
{
produce(producer);
}
public void produce(String producer)
{
storage.produce(producer);
}
public String getProducer()
{
return producer;
}
public void setProducer(String producer)
{
this.producer = producer;
}
public Storage getStorage()
{
return storage;
}
public void setStorage(Storage storage)
{
this.storage = storage;
}
}
消費者.java
public class Consumer extends Thread
{
private String consumer;
private Storage storage;
public Consumer(Storage storage)
{
this.storage = storage;
}
@Override
public void run()
{
consume(consumer);
}
public void consume(String consumer)
{
storage.consume(consumer);
}
public Storage getStorage()
{
return storage;
}
public void setStorage(Storage storage)
{
this.storage = storage;
}
public String getConsumer() {
return consumer;
}
public void setConsumer(String consumer) {
this.consumer = consumer;
}
}
方法二的實現如下:注意lock的lock(),unlock()
public class Storage {
// 倉庫最大存儲量
private final int MAX_SIZE = 100;
// 倉庫存儲的載體
private LinkedList<Object> list = new LinkedList<Object>();
// 鎖
private final Lock lock = new ReentrantLock();
// 倉庫滿的條件變量
private final Condition full = lock.newCondition();
// 倉庫空的條件變量
private final Condition empty = lock.newCondition();
// 生產產品
public void produce(String producer) {
lock.lock();
// 如果倉庫已滿
while (list.size() == MAX_SIZE) {
System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!");
try {
// 由於條件不滿足,生產阻塞
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生產產品
list.add(new Object());
System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size());
empty.signalAll();
// 釋放鎖
lock.unlock();
}
// 消費產品
public void consume(String consumer) {
// 獲得鎖
lock.lock();
// 如果倉庫存儲量不足
while (list.size() == 0) {
System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!");
try {
// 由於條件不滿足,消費阻塞
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size());
full.signalAll();
// 釋放鎖
lock.unlock();
}
public LinkedList<Object> getList() {
return list;
}
public void setList(LinkedList<Object> list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
方法三的實現如下:
import java.util.concurrent.LinkedBlockingQueue;
public class Storage {
// 倉庫最大存儲量
private final int MAX_SIZE = 100;
// 倉庫存儲的載體
private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(100);
// 生產產品
public void produce(String producer) {
// 如果倉庫已滿
if (list.size() == MAX_SIZE) {
System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!");
}
// 生產產品
try {
list.put(new Object());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size());
}
// 消費產品
public void consume(String consumer) {
// 如果倉庫存儲量不足
if (list.size() == 0) {
System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!");
}
try {
list.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size());
}
public LinkedBlockingQueue<Object> getList() {
return list;
}
public void setList(LinkedBlockingQueue<Object> list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
線程同步
同步鎖synchronized
上面的wait(),await()
還有一種join方法,使線程順序進行。
線程實戰
因為需要在controller中同時上傳多個視頻並保存返回的視頻id進數據庫,所以在controller中寫了一個內部類,具體代碼如下:
class Ykupload implements Runnable {
@Override
public void run() {
System.out.println(tpVideoLibrary.toString());
YkuploadRunable yp = new YkuploadRunable();
//獲取yp的數據並寫進數據庫
}
}
后記:貌似不能同時上傳同一個文件,多個視頻上傳速度還是取決於你的帶寬!.
線程池
線程池,顧名思義存放線程的池子,可以類比數據庫的連接池。因為頻繁地創建和銷毀線程會給服務器帶來很大的壓力。若能將創建的線程不再銷毀而是存放在池中等待下一個任務使用,可以不僅減少了創建和銷毀線程所用的時間,提高了性能,同時還減輕了服務器的壓力。
初始化線程池有五個核心參數,分別是 corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue。還有兩個默認參數 threadFactory, handler
corePoolSize:線程池初始核心線程數。初始化線程池的時候,池內是沒有線程,只有在執行任務的時會創建線程。
maximumPoolSize:線程池允許存在的最大線程數。若超過該數字,默認提示RejectedExecutionException異常
keepAliveTime:當前線程數大於核心線程時,該參數生效,其目的是終止多余的空閑線程等待新任務的最長時間。即指定時間內將還未接收任務的線程銷毀。
unit:keepAliveTime 的時間單位
workQueue:緩存任務的的隊列,一般采用LinkedBlockingQueue。
threadFactory:執行程序創建新線程時使用的工廠,一般采用默認值。
handler:超出線程范圍和隊列容量而使執行被阻塞時所使用的處理程序,一般采用默認值。
在接收任務前,線程池內是沒有線程。只有當任務來了才開始新建線程。當任務數大於核心線程數時,任務進入隊列中等待。若隊列滿了,則線程池新增線程直到最大線程數。再超過則會執行拒絕策略。
舉個栗子:
/**
若有Thread1、Thread2、Thread3、Thread4四條線程分別統計C、D、E、F四個盤的大小,所有線程都統計完畢交給Thread5線程去做匯總,應當如何實現
**/
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ITDragonThreads {
public static void main(String[] args) throws Exception {
// 無緩沖無界線程池
ExecutorService executor = Executors.newFixedThreadPool(8);
/**
* CompletionService與ExecutorService最主要的區別在於
*前者submit的task不一定是按照加入時的順序完成的。CompletionService對ExecutorService進行了包裝,
*內部維護一個保存Future對象的BlockingQueue。
*只有當這個Future對象狀態是結束的時候,才會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。
*它會從Queue中取出Future對象,如果Queue是空的,就會阻塞在那里,直到有 完成的Future對象加入到Queue中。
*所以,先完成的必定先被取出。這樣就減少了不必要的等待時間。
**/
CompletionService<Long> completion = new ExecutorCompletionService<Long>(executor);
CountWorker countWorker = null;
for (int i = 0; i < 4; i++) { // 四個線程負責統計
countWorker = new CountWorker(i+1);
completion.submit(countWorker);
}
// 關閉線程池
//shutdown()阻止新來的任務提交,對已經提交了的任務不會產生任何影響。當已經提交的任務執行完后,它會將那些閑置的線程(idleWorks)進行中斷,這個過程是異步的。
executor.shutdown();
// 主線程相當於第五個線程,用於匯總數據
long total = 0;
for (int i = 0; i < 4; i++) {
total += completion.take().get();
}
System.out.println(total / 1024 / 1024 / 1024 +"G");
}
}
class CountWorker implements Callable<Long>{
private Integer type;
public CountWorker() {
}
public CountWorker(Integer type) {
this.type = type;
}
@Override
public Long call() throws Exception {
ArrayList<String> paths = new ArrayList<>(Arrays.asList("c:", "d:", "e:", "f:"));
return countDiskSpace(paths.get(type - 1));
}
// 統計磁盤大小
private Long countDiskSpace (String path) {
File file = new File(path);
long totalSpace = file.getTotalSpace();
System.out.println(path + " 總空間大小 : " + totalSpace / 1024 / 1024 / 1024 + "G");
return totalSpace;
}
}
