線程對象是可以產生線程的對象。比如在Java平台中Thread對象,Runnable對象。線程,是指正在執行的一個指點令序列。在java平台上是指從一個線程對象的start()開始,運行run方法體中的那一段相對獨立的過程。相比於多進程,多線程的優勢有:
(1)進程之間不能共享數據,線程可以;
(2)系統創建進程需要為該進程重新分配系統資源,故創建線程代價比較小;
(3)Java語言內置了多線程功能支持,簡化了java多線程編程。
一、創建線程和啟動
(1)繼承Thread類創建線程類
通過繼承Thread類創建線程類的具體步驟和具體代碼如下:
• 定義一個繼承Thread類的子類,並重寫該類的run()方法;
• 創建Thread子類的實例,即創建了線程對象;
• 調用該線程對象的start()方法啟動線程。
class SomeThead extends Thraad { public void run() { //do something here } } public static void main(String[] args){ SomeThread oneThread = new SomeThread(); 步驟3:啟動線程: oneThread.start(); }
(2)實現Runnable接口創建線程類
通過實現Runnable接口創建線程類的具體步驟和具體代碼如下:
• 定義Runnable接口的實現類,並重寫該接口的run()方法;
• 創建Runnable實現類的實例,並以此實例作為Thread的target對象,即該Thread對象才是真正的線程對象。
class SomeRunnable implements Runnable { public void run() { //do something here } } Runnable oneRunnable = new SomeRunnable(); Thread oneThread = new Thread(oneRunnable); oneThread.start();
(3)通過Callable和Future創建線程
通過Callable和Future創建線程的具體步驟和具體代碼如下:
• 創建Callable接口的實現類,並實現call()方法,該call()方法將作為線程執行體,並且有返回值。
• 創建Callable實現類的實例,使用FutureTask類來包裝Callable對象,該FutureTask對象封裝了該Callable對象的call()方法的返回值。
• 使用FutureTask對象作為Thread對象的target創建並啟動新線程。
• 調用FutureTask對象的get()方法來獲得子線程執行結束后的返回值其中,Callable接口(也只有一個方法)定義如下:
public interface Callable { V call() throws Exception; } 步驟1:創建實現Callable接口的類SomeCallable(略); 步驟2:創建一個類對象: Callable oneCallable = new SomeCallable(); 步驟3:由Callable創建一個FutureTask對象: FutureTask oneTask = new FutureTask(oneCallable); 注釋: FutureTask是一個包裝器,它通過接受Callable來創建,它同時實現了 Future和Runnable接口。 步驟4:由FutureTask創建一個Thread對象: Thread oneThread = new Thread(oneTask); 步驟5:啟動線程: oneThread.start();
二、線程的生命周期
1、新建狀態
用new關鍵字和Thread類或其子類建立一個線程對象后,該線程對象就處於新生狀態。處於新生狀態的線程有自己的內存空間,通過調用start方法進入就緒狀態(runnable)。
注意:不能對已經啟動的線程再次調用start()方法,否則會出現Java.lang.IllegalThreadStateException異常。
2、就緒狀態
處於就緒狀態的線程已經具備了運行條件,但還沒有分配到CPU,處於線程就緒隊列(盡管是采用隊列形式,事實上,把它稱為可運行池而不是可運行隊列。因為cpu的調度不一定是按照先進先出的順序來調度的),等待系統為其分配CPU。等待狀態並不是執行狀態,當系統選定一個等待執行的Thread對象后,它就會從等待執行狀態進入執行狀態,系統挑選的動作稱之為“cpu調度”。一旦獲得CPU,線程就進入運行狀態並自動調用自己的run方法。
提示:如果希望子線程調用start()方法后立即執行,可以使用Thread.sleep()方式使主線程睡眠一伙兒,轉去執行子線程。
3、運行狀態
處於運行狀態的線程最為復雜,它可以變為阻塞狀態、就緒狀態和死亡狀態。
處於就緒狀態的線程,如果獲得了cpu的調度,就會從就緒狀態變為運行狀態,執行run()方法中的任務。如果該線程失去了cpu資源,就會又從運行狀態變為就緒狀態。重新等待系統分配資源。也可以對在運行狀態的線程調用yield()方法,它就會讓出cpu資源,再次變為就緒狀態。
注: 當發生如下情況是,線程會從運行狀態變為阻塞狀態:
①、線程調用sleep方法主動放棄所占用的系統資源
②、線程調用一個阻塞式IO方法,在該方法返回之前,該線程被阻塞
③、線程試圖獲得一個同步監視器,但更改同步監視器正被其他線程所持有
④、線程在等待某個通知(notify)
⑤、程序調用了線程的suspend方法將線程掛起。不過該方法容易導致死鎖,所以程序應該盡量避免使用該方法。
當線程的run()方法執行完,或者被強制性地終止,例如出現異常,或者調用了stop()、desyory()方法等等,就會從運行狀態轉變為死亡狀態。
4、阻塞狀態
處於運行狀態的線程在某些情況下,如執行了sleep(睡眠)方法,或等待I/O設備等資源,將讓出CPU並暫時停止自己的運行,進入阻塞狀態。
在阻塞狀態的線程不能進入就緒隊列。只有當引起阻塞的原因消除時,如睡眠時間已到,或等待的I/O設備空閑下來,線程便轉入就緒狀態,重新到就緒隊列中排隊等待,被系統選中后從原來停止的位置開始繼續運行。有三種方法可以暫停Threads執行:
5、死亡狀態
當線程的run()方法執行完,或者被強制性地終止,就認為它死去。這個線程對象也許是活的,但是,它已經不是一個單獨執行的線程。線程一旦死亡,就不能復生。 如果在一個死去的線程上調用start()方法,會拋出java.lang.IllegalThreadStateException異常。
三、線程管理
Java提供了一些便捷的方法用於會線程狀態的控制。具體如下:
1、線程睡眠——sleep
如果我們需要讓當前正在執行的線程暫停一段時間,並進入阻塞狀態,則可以通過調用Thread的sleep方法。
注:
(1)sleep是靜態方法,最好不要用Thread的實例對象調用它,因為它睡眠的始終是當前正在運行的線程,而不是調用它的線程對象,它只對正在運行狀態的線程對象有效。如下面的例子:
public class Test1 { public static void main(String[] args) throws InterruptedException { System.out.println(Thread.currentThread().getName()); MyThread myThread=new MyThread(); myThread.start(); myThread.sleep(1000);//這里sleep的就是main線程,而非myThread線程 Thread.sleep(10); for(int i=0;i<100;i++){ System.out.println("main"+i); } } }
(2)Java線程調度是Java多線程的核心,只有良好的調度,才能充分發揮系統的性能,提高程序的執行效率。但是不管程序員怎么編寫調度,只能最大限度的影響線程執行的次序,而不能做到精准控制。因為使用sleep方法之后,線程是進入阻塞狀態的,只有當睡眠的時間結束,才會重新進入到就緒狀態,而就緒狀態進入到運行狀態,是由系統控制的,我們不可能精准的去干涉它,所以如果調用Thread.sleep(1000)使得線程睡眠1秒,可能結果會大於1秒。
2、線程讓步——yield
yield()方法和sleep()方法有點相似,它也是Thread類提供的一個靜態的方法,它也可以讓當前正在執行的線程暫停,讓出cpu資源給其他的線程。但是和sleep()方法不同的是,它不會進入到阻塞狀態,而是進入到就緒狀態。yield()方法只是讓當前線程暫停一下,重新進入就緒的線程池中,讓系統的線程調度器重新調度器重新調度一次,完全可能出現這樣的情況:當某個線程調用yield()方法之后,線程調度器又將其調度出來重新進入到運行狀態執行。
實際上,當某個線程調用了yield()方法暫停之后,優先級與當前線程相同,或者優先級比當前線程更高的就緒狀態的線程更有可能獲得執行的機會,當然,只是有可能,因為我們不可能精確的干涉cpu調度線程。用法如下:
public class Test1 { public static void main(String[] args) throws InterruptedException { new MyThread("低級", 1).start(); new MyThread("中級", 5).start(); new MyThread("高級", 10).start(); } } class MyThread extends Thread { public MyThread(String name, int pro) { super(name);// 設置線程的名稱 this.setPriority(pro);// 設置優先級 } @Override public void run() { for (int i = 0; i < 30; i++) { System.out.println(this.getName() + "線程第" + i + "次執行!"); if (i % 5 == 0) Thread.yield(); } } }
注:關於sleep()方法和yield()方的區別如下:
①、sleep方法暫停當前線程后,會進入阻塞狀態,只有當睡眠時間到了,才會轉入就緒狀態。而yield方法調用后 ,是直接進入就緒狀態,所以有可能剛進入就緒狀態,又被調度到運行狀態。
②、sleep方法聲明拋出了InterruptedException,所以調用sleep方法的時候要捕獲該異常,或者顯示聲明拋出該異常。而yield方法則沒有聲明拋出任務異常。
③、sleep方法比yield方法有更好的可移植性,通常不要依靠yield方法來控制並發線程的執行。
3、線程合並——join
線程的合並的含義就是將幾個並行線程的線程合並為一個單線程執行,應用場景是當一個線程必須等待另一個線程執行完畢才能執行時,Thread類提供了join方法來完成這個功能,注意,它不是靜態方法。
從上面的方法的列表可以看到,它有3個重載的方法:
void join()
當前線程等該加入該線程后面,等待該線程終止。 void join(long millis)
當前線程等待該線程終止的時間最長為 millis 毫秒。 如果在millis時間內,該線程沒有執行完,那么當前線程進入就緒狀態,重新等待cpu調度 void join(long millis,int nanos)
等待該線程終止的時間最長為 millis 毫秒 + nanos 納秒。如果在millis時間內,該線程沒有執行完,那么當前線程進入就緒狀態,重新等待cpu調度
4、設置線程的優先級
每個線程執行時都有一個優先級的屬性,優先級高的線程可以獲得較多的執行機會,而優先級低的線程則獲得較少的執行機會。與線程休眠類似,線程的優先級仍然無法保障線程的執行次序。只不過,優先級高的線程獲取CPU資源的概率較大,優先級低的也並非沒機會執行。
每個線程默認的優先級都與創建它的父線程具有相同的優先級,在默認情況下,main線程具有普通優先級。
注:Thread類提供了setPriority(int newPriority)和getPriority()方法來設置和返回一個指定線程的優先級,其中setPriority方法的參數是一個整數,范圍是1~·0之間,也可以使用Thread類提供的三個靜態常量:
MAX_PRIORITY =10 MIN_PRIORITY =1 NORM_PRIORITY =5
public class Test1 { public static void main(String[] args) throws InterruptedException { new MyThread("高級", 10).start(); new MyThread("低級", 1).start(); } } class MyThread extends Thread { public MyThread(String name,int pro) { super(name);//設置線程的名稱 setPriority(pro);//設置線程的優先級 } @Override public void run() { for (int i = 0; i < 100; i++) { System.out.println(this.getName() + "線程第" + i + "次執行!"); } } }
注:雖然Java提供了10個優先級別,但這些優先級別需要操作系統的支持。不同的操作系統的優先級並不相同,而且也不能很好的和Java的10個優先級別對應。所以我們應該使用MAX_PRIORITY、MIN_PRIORITY和NORM_PRIORITY三個靜態常量來設定優先級,這樣才能保證程序最好的可移植性。
5、后台(守護)線程
• 守護線程通常用於執行一些后台作業,例如在你的應用程序運行時播放背景音樂,在文字編輯器里做自動語法檢查、自動保存等功能。
• Java的垃圾回收也是一個守護線程。守護線的好處就是你不需要關心它的結束問題。例如你在你的應用程序運行的時候希望播放背景音樂,如果將這個播放背景音樂的線程設定為非守護線程,那么在用戶請求退出的時候,不僅要退出主線程,還要通知播放背景音樂的線程退出;如果設定為守護線程則不需要了。
setDaemon方法的詳細說明:
public final void setDaemon(boolean on) 將該線程標記為守護線程或用戶線程。當正在運行的線程都是守護線程時,Java 虛擬機退出。 該方法必須在啟動線程前調用。 該方法首先調用該線程的 checkAccess 方法,且不帶任何參數。這可能拋出 SecurityException(在當前線程中)。 參數: on - 如果為 true,則將該線程標記為守護線程。 拋出: IllegalThreadStateException - 如果該線程處於活動狀態。 SecurityException - 如果當前線程無法修改該線程。
注:JRE判斷程序是否執行結束的標准是所有的前台執線程行完畢了,而不管后台線程的狀態,因此,在使用后台縣城時候一定要注意這個問題。
6、正確結束線程
Thread.stop()、Thread.suspend、Thread.resume、Runtime.runFinalizersOnExit這些終止線程運行的方法已經被廢棄了,使用它們是極端不安全的!想要安全有效的結束一個線程,可以使用下面的方法:
• 正常執行完run方法,然后結束掉;
• 控制循環條件和判斷條件的標識符來結束掉線程。
class MyThread extends Thread { int i=0; boolean next=true; @Override public void run() { while (next) { if(i==10) next=false; i++; System.out.println(i); } } }
四、線程同步
java允許多線程並發控制,當多個線程同時操作一個可共享的資源變量時(如數據的增刪改查),將會導致數據不准確,相互之間產生沖突,因此加入同步鎖以避免在該線程沒有完成操作之前,被其他線程的調用,從而保證了該變量的唯一性和准確性。
1、同步方法
即有synchronized關鍵字修飾的方法。由於java的每個對象都有一個內置鎖,當用此關鍵字修飾方法時,內置鎖會保護整個方法。在調用該方法前,需要獲得內置鎖,否則就處於阻塞狀態。
public synchronized void save(){}
注: synchronized關鍵字也可以修飾靜態方法,此時如果調用該靜態方法,將會鎖住整個類
2、同步代碼塊
即有synchronized關鍵字修飾的語句塊。被該關鍵字修飾的語句塊會自動被加上內置鎖,從而實現同步。
private int count =0;//賬戶余額
//存錢
public void addMoney(int money){
synchronized (this) {
count +=money;
}
System.out.println(System.currentTimeMillis()+"存進:"+money);
}
//取錢
public void subMoney(int money){
synchronized (this) {
if(count-money < 0){
System.out.println("余額不足");
return;
}
count -=money;
}
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}
//查詢
public void lookMoney(){
System.out.println("賬戶余額:"+count);
}
}
注:同步是一種高開銷的操作,因此應該盡量減少同步的內容。通常沒有必要同步整個方法,使用synchronized代碼塊同步關鍵代碼即可。
3、使用特殊域變量(volatile)實現線程同步
• volatile關鍵字為域變量的訪問提供了一種免鎖機制;
• 使用volatile修飾域相當於告訴虛擬機該域可能會被其他線程更新;
• 因此每次使用該域就要重新計算,而不是使用寄存器中的值;
• volatile不會提供任何原子操作,它也不能用來修飾final類型的變量。
public class SynchronizedThread { class Bank { private volatile int account = 100; public int getAccount() { return account; } /** * 用同步方法實現 * * @param money */ public synchronized void save(int money) { account += money; } /** * 用同步代碼塊實現 * * @param money */ public void save1(int money) { synchronized (this) { account += money; } } } class NewThread implements Runnable { private Bank bank; public NewThread(Bank bank) { this.bank = bank; } @Override public void run() { for (int i = 0; i < 10; i++) { // bank.save1(10); bank.save(10); System.out.println(i + "賬戶余額為:" +bank.getAccount()); } } } /** * 建立線程,調用內部類 */ public void useThread() { Bank bank = new Bank(); NewThread new_thread = new NewThread(bank); System.out.println("線程1"); Thread thread1 = new Thread(new_thread); thread1.start(); System.out.println("線程2"); Thread thread2 = new Thread(new_thread); thread2.start(); } public static void main(String[] args) { SynchronizedThread st = new SynchronizedThread(); st.useThread(); }
注:多線程中的非同步問題主要出現在對域的讀寫上,如果讓域自身避免這個問題,則就不需要修改操作該域的方法。用final域,有鎖保護的域和volatile域可以避免非同步的問題。
4、使用重入鎖(Lock)實現線程同步
在JavaSE5.0中新增了一個java.util.concurrent包來支持同步。ReentrantLock類是可重入、互斥、實現了Lock接口的鎖,它與使用synchronized方法和快具有相同的基本行為和語義,並且擴展了其能力。ReenreantLock類的常用方法有:
ReentrantLock() : 創建一個ReentrantLock實例
lock() : 獲得鎖
unlock() : 釋放鎖
注:ReentrantLock()還有一個可以創建公平鎖的構造方法,但由於能大幅度降低程序運行效率,不推薦使用
//只給出要修改的代碼,其余代碼與上同 class Bank { private int account = 100; //需要聲明這個鎖 private Lock lock = new ReentrantLock(); public int getAccount() { return account; } //這里不再需要synchronized public void save(int money) { lock.lock(); try{ account += money; }finally{ lock.unlock(); } } }
五、線程通信
1、借助於Object類的wait()、notify()和notifyAll()實現通信
線程執行wait()后,就放棄了運行資格,處於凍結狀態;
線程運行時,內存中會建立一個線程池,凍結狀態的線程都存在於線程池中,notify()執行時喚醒的也是線程池中的線程,線程池中有多個線程時喚醒第一個被凍結的線程。
notifyall(), 喚醒線程池中所有線程。
注: (1) wait(), notify(),notifyall()都用在同步里面,因為這3個函數是對持有鎖的線程進行操作,而只有同步才有鎖,所以要使用在同步中;
(2) wait(),notify(),notifyall(), 在使用時必須標識它們所操作的線程持有的鎖,因為等待和喚醒必須是同一鎖下的線程;而鎖可以是任意對象,所以這3個方法都是Object類中的方法。
單個消費者生產者例子如下:
class Resource{ //生產者和消費者都要操作的資源 private String name; private int count=1; private boolean flag=false; public synchronized void set(String name){ if(flag) try{wait();}catch(Exception e){} this.name=name+"---"+count++; System.out.println(Thread.currentThread().getName()+"...生產者..."+this.name); flag=true; this.notify(); } public synchronized void out(){ if(!flag) try{wait();}catch(Exception e){} System.out.println(Thread.currentThread().getName()+"...消費者..."+this.name); flag=false; this.notify(); } } class Producer implements Runnable{ private Resource res; Producer(Resource res){ this.res=res; } public void run(){ while(true){ res.set("商品"); } } } class Consumer implements Runnable{ private Resource res; Consumer(Resource res){ this.res=res; } public void run(){ while(true){ res.out(); } } } public class ProducerConsumerDemo{ public static void main(String[] args){ Resource r=new Resource(); Producer pro=new Producer(r); Consumer con=new Consumer(r); Thread t1=new Thread(pro); Thread t2=new Thread(con); t1.start(); t2.start(); } }//運行結果正常,生產者生產一個商品,緊接着消費者消費一個商品。
但是如果有多個生產者和多個消費者,上面的代碼是有問題,比如2個生產者,2個消費者,運行結果就可能出現生產的1個商品生產了一次而被消費了2次,或者連續生產2個商品而只有1個被消費,這是因為此時共有4個線程在操作Resource對象r, 而notify()喚醒的是線程池中第1個wait()的線程,所以生產者執行notify()時,喚醒的線程有可能是另1個生產者線程,這個生產者線程從wait()中醒來后不會再判斷flag,而是直接向下運行打印出一個新的商品,這樣就出現了連續生產2個商品。
為了避免這種情況,修改代碼如下:
class Resource{ private String name; private int count=1; private boolean flag=false; public synchronized void set(String name){ while(flag) /*原先是if,現在改成while,這樣生產者線程從凍結狀態醒來時,還會再判斷flag.*/ try{wait();}catch(Exception e){} this.name=name+"---"+count++; System.out.println(Thread.currentThread().getName()+"...生產者..."+this.name); flag=true; this.notifyAll();/*原先是notity(), 現在改成notifyAll(),這樣生產者線程生產完一個商品后可以將等待中的消費者線程喚醒,否則只將上面改成while后,可能出現所有生產者和消費者都在wait()的情況。*/ } public synchronized void out(){ while(!flag) /*原先是if,現在改成while,這樣消費者線程從凍結狀態醒來時,還會再判斷flag.*/ try{wait();}catch(Exception e){} System.out.println(Thread.currentThread().getName()+"...消費者..."+this.name); flag=false; this.notifyAll(); /*原先是notity(), 現在改成notifyAll(),這樣消費者線程消費完一個商品后可以將等待中的生產者線程喚醒,否則只將上面改成while后,可能出現所有生產者和消費者都在wait()的情況。*/ } } public class ProducerConsumerDemo{ public static void main(String[] args){ Resource r=new Resource(); Producer pro=new Producer(r); Consumer con=new Consumer(r); Thread t1=new Thread(pro); Thread t2=new Thread(con); Thread t3=new Thread(pro); Thread t4=new Thread(con); t1.start(); t2.start(); t3.start(); t4.start(); } }
2、使用Condition控制線程通信
jdk1.5中,提供了多線程的升級解決方案為:
(1)將同步synchronized替換為顯式的Lock操作;
(2)將Object類中的wait(), notify(),notifyAll()替換成了Condition對象,該對象可以通過Lock鎖對象獲取;
(3)一個Lock對象上可以綁定多個Condition對象,這樣實現了本方線程只喚醒對方線程,而jdk1.5之前,一個同步只能有一個鎖,不同的同步只能用鎖來區分,且鎖嵌套時容易死鎖。
class Resource{ private String name; private int count=1; private boolean flag=false; private Lock lock = new ReentrantLock();/*Lock是一個接口,ReentrantLock是該接口的一個直接子類。*/ private Condition condition_pro=lock.newCondition(); /*創建代表生產者方面的Condition對象*/ private Condition condition_con=lock.newCondition(); /*使用同一個鎖,創建代表消費者方面的Condition對象*/ public void set(String name){ lock.lock();//鎖住此語句與lock.unlock()之間的代碼 try{ while(flag) condition_pro.await(); //生產者線程在conndition_pro對象上等待 this.name=name+"---"+count++; System.out.println(Thread.currentThread().getName()+"...生產者..."+this.name); flag=true; condition_con.signalAll(); } finally{ lock.unlock(); //unlock()要放在finally塊中。 } } public void out(){ lock.lock(); //鎖住此語句與lock.unlock()之間的代碼 try{ while(!flag) condition_con.await(); //消費者線程在conndition_con對象上等待 System.out.println(Thread.currentThread().getName()+"...消費者..."+this.name); flag=false; condition_pro.signqlAll(); /*喚醒所有在condition_pro對象下等待的線程,也就是喚醒所有生產者線程*/ } finally{ lock.unlock(); } } }
3、使用阻塞隊列(BlockingQueue)控制線程通信
BlockingQueue是一個接口,也是Queue的子接口。BlockingQueue具有一個特征:當生產者線程試圖向BlockingQueue中放入元素時,如果該隊列已滿,則線程被阻塞;但消費者線程試圖從BlockingQueue中取出元素時,如果隊列已空,則該線程阻塞。程序的兩個線程通過交替向BlockingQueue中放入元素、取出元素,即可很好地控制線程的通信。
BlockingQueue提供如下兩個支持阻塞的方法:
(1)put(E e):嘗試把Eu元素放如BlockingQueue中,如果該隊列的元素已滿,則阻塞該線程。
(2)take():嘗試從BlockingQueue的頭部取出元素,如果該隊列的元素已空,則阻塞該線程。
BlockingQueue繼承了Queue接口,當然也可以使用Queue接口中的方法,這些方法歸納起來可以分為如下三組:
(1)在隊列尾部插入元素,包括add(E e)、offer(E e)、put(E e)方法,當該隊列已滿時,這三個方法分別會拋出異常、返回false、阻塞隊列。
(2)在隊列頭部刪除並返回刪除的元素。包括remove()、poll()、和take()方法,當該隊列已空時,這三個方法分別會拋出異常、返回false、阻塞隊列。
(3)在隊列頭部取出但不刪除元素。包括element()和peek()方法,當隊列已空時,這兩個方法分別拋出異常、返回false。
BlockingQueue接口包含如下5個實現類:
ArrayBlockingQueue :基於數組實現的BlockingQueue隊列。
LinkedBlockingQueue:基於鏈表實現的BlockingQueue隊列。
PriorityBlockingQueue:它並不是保准的阻塞隊列,該隊列調用remove()、poll()、take()等方法提取出元素時,並不是取出隊列中存在時間最長的元素,而是隊列中最小的元素。
它判斷元素的大小即可根據元素(實現Comparable接口)的本身大小來自然排序,也可使用Comparator進行定制排序。
SynchronousQueue:同步隊列。對該隊列的存、取操作必須交替進行。
DelayQueue:它是一個特殊的BlockingQueue,底層基於PriorityBlockingQueue實現,不過,DelayQueue要求集合元素都實現Delay接口(該接口里只有一個long getDelay()方法),
DelayQueue根據集合元素的getDalay()方法的返回值進行排序。
copy的一個示例:
1 import java.util.concurrent.ArrayBlockingQueue; 2 import java.util.concurrent.BlockingQueue; 3 public class BlockingQueueTest{ 4 public static void main(String[] args)throws Exception{ 5 //創建一個容量為1的BlockingQueue 6 7 BlockingQueue<String> b=new ArrayBlockingQueue<>(1); 8 //啟動3個生產者線程 9 new Producer(b).start(); 10 new Producer(b).start(); 11 new Producer(b).start(); 12 //啟動一個消費者線程 13 new Consumer(b).start(); 14 15 } 16 } 17 class Producer extends Thread{ 18 private BlockingQueue<String> b; 19 20 public Producer(BlockingQueue<String> b){ 21 this.b=b; 22 23 } 24 public synchronized void run(){ 25 String [] str=new String[]{ 26 "java", 27 "struts", 28 "Spring" 29 }; 30 for(int i=0;i<9999999;i++){ 31 System.out.println(getName()+"生產者准備生產集合元素!"); 32 try{ 33 34 b.put(str[i%3]); 35 sleep(1000); 36 //嘗試放入元素,如果隊列已滿,則線程被阻塞 37 38 }catch(Exception e){System.out.println(e);} 39 System.out.println(getName()+"生產完成:"+b); 40 } 41 42 } 43 } 44 class Consumer extends Thread{ 45 private BlockingQueue<String> b; 46 public Consumer(BlockingQueue<String> b){ 47 this.b=b; 48 } 49 public synchronized void run(){ 50 51 while(true){ 52 System.out.println(getName()+"消費者准備消費集合元素!"); 53 try{ 54 sleep(1000); 55 //嘗試取出元素,如果隊列已空,則線程被阻塞 56 b.take(); 57 }catch(Exception e){System.out.println(e);} 58 System.out.println(getName()+"消費完:"+b); 59 } 60 61 } 62 }
六、線程池
合理利用線程池能夠帶來三個好處。
- 降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
- 提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。
- 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。
1、使用Executors工廠類產生線程池
Executor線程池框架的最大優點是把任務的提交和執行解耦。客戶端將要執行的任務封裝成Task,然后提交即可。而Task如何執行客戶端則是透明的。具體點講,提交一個Callable對象給ExecutorService(如最常用的線程池ThreadPoolExecutor),將得到一個Future對象,調用Future對象的get方法等待執行結果。線程池實現原理類結構圖如下:
上圖中涉及到的線程池內部實現原理的所有類,不利於我們理解線程池如何使用。我們先從客戶端的角度出發,看看客戶端使用線程池所涉及到的類結構圖:
由上圖可知,ExecutorService是Java中對線程池定義的一個接口,它java.util.concurrent
包中。 Java API對ExecutorService接口的實現有兩個,所以這兩個即是Java線程池具體實現類如下:
ThreadPoolExecutor
ScheduledThreadPoolExecutor
除此之外,ExecutorService還繼承了Executor
接口(注意區分Executor接口和Executors工廠類),這個接口只有一個execute()
方法,最后我們看一下整個繼承樹:
使用Executors執行多線程任務的步驟如下:
• 調用Executors類的靜態工廠方法創建一個ExecutorService對象,該對象代表一個線程池;
• 創建Runnable實現類或Callable實現類的實例,作為線程執行任務;
• 調用ExecutorService對象的submit()方法來提交Runnable實例或Callable實例;
• 當不想提交任務時,調用ExecutorService對象的shutdown()方法來關閉線程池。
(1)使用Executors的靜態工廠類創建線程池的方法如下:
1、newFixedThreadPool() :
作用:該方法返回一個固定線程數量的線程池,該線程池中的線程數量始終不變,即不會再創建新的線程,也不會銷毀已經創建好的線程,自始自終都是那幾個固定的線程在工作,所以該線程池可以控制線程的最大並發數。
栗子:假如有一個新任務提交時,線程池中如果有空閑的線程則立即使用空閑線程來處理任務,如果沒有,則會把這個新任務存在一個任務隊列中,一旦有線程空閑了,則按FIFO方式處理任務隊列中的任務。
2、newCachedThreadPool() :
作用:該方法返回一個可以根據實際情況調整線程池中線程的數量的線程池。即該線程池中的線程數量不確定,是根據實際情況動態調整的。
栗子:假如該線程池中的所有線程都正在工作,而此時有新任務提交,那么將會創建新的線程去處理該任務,而此時假如之前有一些線程完成了任務,現在又有新任務提交,那么將不會創建新線程去處理,而是復用空閑的線程去處理新任務。那么此時有人有疑問了,那這樣來說該線程池的線程豈不是會越集越多?其實並不會,因為線程池中的線程都有一個“保持活動時間”的參數,通過配置它,如果線程池中的空閑線程的空閑時間超過該“保存活動時間”則立刻停止該線程,而該線程池默認的“保持活動時間”為60s。
3、newSingleThreadExecutor() :
作用:該方法返回一個只有一個線程的線程池,即每次只能執行一個線程任務,多余的任務會保存到一個任務隊列中,等待這一個線程空閑,當這個線程空閑了再按FIFO方式順序執行任務隊列中的任務。
4、newScheduledThreadPool() :
作用:該方法返回一個可以控制線程池內線程定時或周期性執行某任務的線程池。
5、newSingleThreadScheduledExecutor() :
作用:該方法返回一個可以控制線程池內線程定時或周期性執行某任務的線程池。只不過和上面的區別是該線程池大小為1,而上面的可以指定線程池的大小。
注:Executors只是一個工廠類,它所有的方法返回的都是ThreadPoolExecutor
、ScheduledThreadPoolExecutor
這兩個類的實例。
(2) ExecutorService有如下幾個執行方法:
- execute(Runnable) - submit(Runnable) - submit(Callable) - invokeAny(...) - invokeAll(...)
execute(Runnable)
這個方法接收一個Runnable實例,並且異步的執行,請看下面的實例:
ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
submit(Runnable)
submit(Runnable)
和execute(Runnable)
區別是前者可以返回一個Future對象,通過返回的Future對象,我們可以檢查提交的任務是否執行完畢,請看下面執行的例子:
Future future = executorService.submit(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); future.get(); //returns null if the task has finished correctly.
注:如果任務執行完成,future.get()
方法會返回一個null。注意,future.get()方法會產生阻塞。
submit(Callable)
submit(Callable)
和submit(Runnable)
類似,也會返回一個Future對象,但是除此之外,submit(Callable)接收的是一個Callable的實現,Callable接口中的call()
方法有一個返回值,可以返回任務的執行結果,而Runnable接口中的run()
方法是void
的,沒有返回值。請看下面實例:
Future future = executorService.submit(new Callable(){ public Object call() throws Exception { System.out.println("Asynchronous Callable"); return "Callable Result"; } }); System.out.println("future.get() = " + future.get());
如果任務執行完成,future.get()方法會返回Callable任務的執行結果。另外,future.get()方法會產生阻塞。
invokeAny(…)
invokeAny(...)
方法接收的是一個Callable的集合,執行這個方法不會返回Future,但是會返回所有Callable任務中其中一個任務的執行結果。這個方法也無法保證返回的是哪個任務的執行結果,反正是其中的某一個。請看下面實例:
ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); String result = executorService.invokeAny(callables); System.out.println("result = " + result); executorService.shutdown();
大家可以嘗試執行上面代碼,每次執行都會返回一個結果,並且返回的結果是變化的,可能會返回“Task2”也可是“Task1”或者其它。
invokeAll(…)
invokeAll(...)
與 invokeAny(...)
類似也是接收一個Callable集合,但是前者執行之后會返回一個Future的List,其中對應着每個Callable任務執行后的Future對象。情況下面這個實例:
ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); List<Future<String>> futures = executorService.invokeAll(callables); for(Future<String> future : futures){ System.out.println("future.get = " + future.get()); } executorService.shutdown();
(3) ExecutorService關閉方法
當我們使用完成ExecutorService之后應該關閉它,否則它里面的線程會一直處於運行狀態。舉個例子,如果的應用程序是通過main()方法啟動的,在這個main()退出之后,如果應用程序中的ExecutorService沒有關閉,這個應用將一直運行。之所以會出現這種情況,是因為ExecutorService中運行的線程會阻止JVM關閉。
要關閉ExecutorService中執行的線程,我們可以調用ExecutorService.shutdown()
方法。在調用shutdown()方法之后,ExecutorService不會立即關閉,但是它不再接收新的任務,直到當前所有線程執行完成才會關閉,所有在shutdown()執行之前提交的任務都會被執行。
如果想立即關閉ExecutorService,我們可以調用ExecutorService.shutdownNow()
方法。這個動作將跳過所有正在執行的任務和被提交還沒有執行的任務。但是它並不對正在執行的任務做任何保證,有可能它們都會停止,也有可能執行完成。
2、使用Java8增強的ForkJoinPool產生線程池
在Java 8中,引入了自動並行化的概念。它能夠讓一部分Java代碼自動地以並行的方式執行,前提是使用了ForkJoinPool。
ForkJoinPool同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService接口。它使用了一個無限隊列來保存需要執行的任務,而線程的數量則是通過構造函數傳入,如果沒有向構造函數中傳入希望的線程數量,那么當前計算機可用的CPU數量會被設置為線程數量作為默認值。
ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序算法。這里的要點在於,ForkJoinPool需要使用相對少的線程來處理大量的任務。比如要對1000萬個數據進行排序,那么會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合並任務。以此類推,對於500萬的數據也會做出同樣的分割處理,到最后會設置一個閾值來規定當數據規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序。那么到最后,所有的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之后,它才能夠被執行。所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法像任務隊列中再添加一個任務並且在等待該任務完成之后再繼續執行。而使用ForkJoinPool時,就能夠讓其中的線程創建新的任務,並掛起當前的任務,此時線程就能夠從隊列中選擇子任務執行。比如,我們需要統計一個double數組中小於0.5的元素的個數,那么可以使用ForkJoinPool進行實現如下:
public class ForkJoinTest { private double[] d; private class ForkJoinTask extends RecursiveTask { private int first; private int last; public ForkJoinTask(int first, int last) { this.first = first; this.last = last; } protected Integer compute() { int subCount; if (last - first < 10) { subCount = 0; for (int i = first; i <= last; i++) { if (d[i] < 0.5){ subCount++;
} } }else { int mid = (first + last) /2; ForkJoinTask left = new ForkJoinTask(first, mid); left.fork(); ForkJoinTask right = new ForkJoinTask(mid + 1, last); right.fork(); subCount = left.join(); subCount += right.join(); } return subCount; } } public static void main(String[] args) { ForkJoinPool pool=new ForkJoinPool(); pool.submit(new ForkJoinTask(0, 9999999));
pool.awaitTermination(2,TimeUnit.SECONDS); System.out.println("Found " + n + " values"); } }
以上的關鍵是fork()和join()方法。在ForkJoinPool使用的線程中,會使用一個內部隊列來對需要執行的任務以及子任務進行操作來保證它們的執行順序。
注:使用ThreadPoolExecutor和ForkJoinPool的性能差異:
(1)首先,使用ForkJoinPool能夠使用數量有限的線程來完成非常多的具有父子關系的任務,比如使用4個線程來完成超過200萬個任務。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關系的任務時,也需要200萬個線程,顯然這是不可行的。
(2)ForkJoinPool能夠實現工作竊取(Work Stealing),在該線程池的每個線程中會維護一個隊列來存放需要被執行的任務。當線程自身隊列中的任務都執行完畢后,它會從別的線程中拿到未被執行的任務並幫助它執行。因此,提高了線程的利用率,從而提高了整體性能。
(3)對於ForkJoinPool,還有一個因素會影響它的性能,就是停止進行任務分割的那個閾值。比如在之前的快速排序中,當剩下的元素數量小於10的時候,就會停止子任務的創建。
結論:
- 當需要處理遞歸分治算法時,考慮使用ForkJoinPool;
- 仔細設置不再進行任務划分的閾值,這個閾值對性能有影響;
- Java 8中的一些特性會使用到ForkJoinPool中的通用線程池。在某些場合下,需要調整該線程池的默認的線程數量。
七、死鎖
產生死鎖的四個必要條件如下。當下邊的四個條件都滿足時即產生死鎖,即任意一個條件不滿足既不會產生死鎖。
(1)死鎖的四個必要條件
- 互斥條件:資源不能被共享,只能被同一個進程使用
- 請求與保持條件:已經得到資源的進程可以申請新的資源
- 非剝奪條件:已經分配的資源不能從相應的進程中被強制剝奪
- 循環等待條件:系統中若干進程組成環路,該環路中每個進程都在等待相鄰進程占用的資源
舉個常見的死鎖例子:進程A中包含資源A,進程B中包含資源B,A的下一步需要資源B,B的下一步需要資源A,所以它們就互相等待對方占有的資源釋放,所以也就產生了一個循環等待死鎖。
(2)處理死鎖的方法
- 忽略該問題,也即鴕鳥算法。當發生了什么問題時,不管他,直接跳過,無視它;
- 檢測死鎖並恢復;
- 資源進行動態分配;
- 破除上面的四種死鎖條件之一。
八、線程相關類
(1)ThreadLocal
ThreadLocal它並不是一個線程,而是一個可以在每個線程中存儲數據的數據存儲類,通過它可以在指定的線程中存儲數據,數據存儲之后,只有在指定線程中可以獲取到存儲的數據,對於其他線程來說則無法獲取到該線程的數據。 即多個線程通過同一個ThreadLocal獲取到的東西是不一樣的,就算有的時候出現的結果是一樣的(偶然性,兩個線程里分別存了兩份相同的東西),但他們獲取的本質是不同的。使用這個工具類可以簡化多線程編程時的並發訪問,很簡潔的隔離多線程程序的競爭資源。
對於多線程資源共享的問題,同步機制采用了“以時間換空間”的方式,而ThreadLocal采用了“以空間換時間”的方式。前者僅提供一份變量,讓不同的線程排隊訪問,而后者為每一個線程都提供了一份變量,因此可以同時訪問而互不影響。ThreadLocal類提供了如下的三個public方法:
ThreadLocal() 創建一個線程本地變量。 T get() 返回此線程局部變量的當前線程副本中的值,如果這是線程第一次調用該方法,則創建並初始化此副本。 protected T initialValue() 返回此線程局部變量的當前線程的初始值。
下面通過系統源碼來分析出現這個結果的原因。 在ThreadLocal中存在着兩個很重要的方法,get()和set()方法,一個讀取一個設置。
/** * Returns the value of this variable for the current thread. If an entry * doesn't yet exist for this variable on this thread, this method will * create an entry, populating the value with the result of * {@link #initialValue()}. * * @return the current value of the variable for the calling thread. */ @SuppressWarnings("unchecked") public T get() { // Optimized for the fast path. Thread currentThread = Thread.currentThread(); Values values = values(currentThread); if (values != null) { Object[] table = values.table; int index = hash & values.mask; if (this.reference == table[index]) { return (T) table[index + 1]; } } else { values = initializeValues(currentThread); } return (T) values.getAfterMiss(this); } /** * Sets the value of this variable for the current thread. If set to * {@code null}, the value will be set to null and the underlying entry will * still be present. * * @param value the new value of the variable for the caller thread. */ public void set(T value) { Thread currentThread = Thread.currentThread(); Values values = values(currentThread); if (values == null) { values = initializeValues(currentThread); } values.put(this, value); }
從注釋上可以看出,get方法會返回一個當前線程的變量值,如果數組不存在就會創建一個新的。另外,對於“當前線程”和“數組”,數組對於每個線程來說都是不同的 values.table。而values是通過當前線程獲取到的一個Values對象,因此這個數組是每個線程唯一的,不能共用,而下面的幾句話也更直接了,獲取一個索引,再返回通過這個索引找到數組中對應的值。這也就解釋了為什么多個線程通過同一個ThreadLocal返回的是不同的東西。
Java中為什么要這么設置呢?
- ThreadLocal在日常開發中使用到的地方較少,但是在某些特殊的場景下,通過ThreadLocal可以輕松實現一些看起來很復雜的功能。一般來說,當某些數據是以線程為作用域並且不同線程具有不同的數據副本的時候,就可以考慮使用ThreadLocal。例如在Handler和Looper中。對於Handler來說,它需要獲取當前線程的Looper,很顯然Looper的作用域就是線程並且不同的線程具有不同的Looper,這個時候通過ThreadLocal就可以輕松的實現Looper在線程中的存取。如果不采用ThreadLocal,那么系統就必須提供一個全局的哈希表供Handler查找指定的Looper,這樣就比較麻煩了,還需要一個管理類。
- ThreadLocal的另一個使用場景是復雜邏輯下的對象傳遞,比如監聽器的傳遞,有些時候一個線程中的任務過於復雜,就可能表現為函數調用棧比較深以及代碼入口的多樣性,這種情況下,我們又需要監聽器能夠貫穿整個線程的執行過程。這個時候就可以使用到ThreadLocal,通過ThreadLocal可以讓監聽器作為線程內的全局對象存在,在線程內通過get方法就可以獲取到監聽器。如果不采用的話,可以使用參數傳遞,但是這種方式在設計上不是特別好,當調用棧很深的時候,通過參數來傳遞監聽器這個設計太糟糕。而另外一種方式就是使用static靜態變量的方式,但是這種方式存在一定的局限性,拓展性並不是特別的強。比如有10個線程在執行,就需要提供10個監聽器對象。
注:ThreadLocal和其他所有的同步機制一樣,都是為了解決多線程中對於同一變量的訪問沖突。值普通的同步機制中,通過對象加鎖來實現多線程對同一變量的安全訪問,且該變量是多線程共享的,所有需要使用這種同步機制來明確分開是在什么時候對變量進行讀寫,在什么時候需要鎖定該對象。此種情況下,系統並沒有將這個資源復制多份,而是采取安全機制來控制訪問而已。ThreadLocal只是從另一個角度解決多線程的並發訪問,即將需要並發訪問的資源復制多份,每個線程擁有一份資源,每個線程都有自己的資源副本。
總結:若多個線程之間需要共享資源,以達到線程間的通信時,就使用同步機制;若僅僅需要隔離多線程之間的關系資源,則可以使用ThreadLocal。