Java 從單核到多核的多線程(並發)


JAVA 並發編程

      最初計算機是單任務的,然后發展到多任務,接着出現多線程並行,同時計算機也從單cpu進入到多cpu。如下圖:

     

 

      多任務:其實就是利用操作系統時間片輪轉使用的原理。操作系統通過將cpu的執行時間分割成多個時間片,為每個任務分配時間片,因為cpu處理速度很快,這樣就用戶看來好像每個任務都在同時執行,感覺有多個cpu,但本質上一個時間點只有一個任務在運行。

      隨着多核多線程的出現,我們可以更好的利用資源但是同時也面臨着更多的多線程編程挑戰。

      並行編程的好處:

     1)提高資源利用率,提升程序運行時間---cpu的就是利用率

     2)提高程序響應速度,比如用戶界面的點擊按鈕,就是使用多線程,服務器收到用戶點擊請求,將這個請求交於一個新線程(worker)去執行,這樣服務器就可以繼續等待用戶的輸入請求,否則服務器在處理上一個請求的時候是無法響應當前用戶的請求的。

     並行編程的代價和挑戰:

     1)增加內存的消耗。

     2)上下文的切換會消耗額外內存,從一個線程切換到另一個線程,需要記錄當前線程的數據變量,指針等,然后執行另一個線程。

     3)內存數據的同步,鎖,通信等問題。

    線程池(ThreadPool):

    我想大部分人在聽到這個東西的時候會感覺很神奇,但其實ThreadPool特別簡單。線程池就是我們通過人工或者手動設置內存當中線程數的數量,使得程序可以最優運行。簡單理解就是這樣:我們設置一個線程池的大小,比如線程池的數量為10,那么當有線程任務來臨的時候我們就使用線程池的線程去執行這個任務,如果線程池的10個線程都在執行任務,就把這個任務加到等待隊列,等候其他線程運行結束后再執行。

    使用線程池的好處:

     1)降低資源消耗,通過重復利用在線程池中已創建好的線程執行任務,減少創建、銷毀線程的內存和時間開銷。

     2)響應速度快,因為直接使用已創建好的線程執行任務,而不是去創建線程,所以響應時間快。

     3)可以更好的管理線程,因為線程是稀有資源,避免隨意創建線程(一個線程要暫用1M的內存左右)

   

      很多問題我們使用順序編程便可以解決,但是有些問題如果能夠使用多線程並行的執行其中的任務則可以很大程度的提高時間效率,所以多線程還是很有必要的。

      我自己總結了JAVA並行的3個發展階段(菜鳥總結,請體諒)

      第一階段:Thread ,Runable  

      第二階段:ExecutorService執行器

      第三階段: ForkJoin並行框架(其實就是ExecutorService的升級應用而已)

      並發很大一方面是為了提高程序運行速度,如果想要一個程序運行的更快,那么可以將其分為多個片段,然后在每個單獨的處理器上運行多個任務。現在是多核時代我們應該掌握,

      但是我們知道並發通常是用在提高單核機器的程序性能上,這個咋一聽感覺有點不能理解,學過操作系統的人應該知道,從一個任務切換到另一個任務是會有上下文開銷的。但是因為有了“阻塞”,使得這個問題變得有些不同。

     “阻塞”通常指的是一個程序的某個任務由於要執行程序控制之外的事,比如請求I/O資源,由於需要等待請求的I/O資源,所以會導致程序的暫停,就是cpu空閑。我們知道cpu是很寶貴的資源,我們應當充分利用它才對,這時候多線程就出來了,想想啊,當某個線程阻塞導致cpu空閑,這時候操作系統就將cpu分配給其他等待的線程使得cpu無時無刻不在運行。單個進程可以有多個並發執行的任務,我們感覺好像每個任務都有自己的cpu一樣,其底層機制就是切分cpu時間,通常來說不需要我們管。

      從事實上來看,如果程序沒有任何阻塞,那么在單處理器上的並發是沒有意義的。

   (1)傳統的並發編程采用Thread類

  • 創建Thread類子例並重寫run方法
  • 編寫類的時候實現Runnable接口方法,也是使用run方法。
public class App  {
    public static class demo extends Thread  
    {
        int x;
        public demo(int x)
        {
            this.x=x;
        }
        public void run() {
           System.out.print("線程"+x);
        } 
    }
    public static void main(String[] args) {
        demo dem=new demo(1);
        dem.start();         
    }

}

}

public class CommonRunnable implements Runnable{
public void run() {
System.out.println("MyRunnable running");	
}

    無論是Thread還是Runable其實都只要我們覆蓋實現Run方法就好了,但是由於java類只能繼承一次而接口可以有無數個所以我們更經常使用Ruanble接口。我們調用新線程都是使用start()方法而不是run()方法。

      start方法的本質:從cpu中申請另一個線程空間來執行run方法,這樣是並發線程。(其實它也是會自己調用run里面的方法,但是如果我們直接調用run方法的話,那么就是單線程而已)

     以上兩種雖然可以實現基本的並行結構,但是對於復雜的問題就會很麻煩,因此就有了在jdk5里面引入的Excutor執行器,其實就是實現線程池。

(2)ExecutorService執行器

        是指java 5中引入的一系列並發庫中與executor相關的一些功能類,其中包括線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。

        Executor用來管理Runable對象的執行。用來創建並管理一組Runable對象的線程,這組線程就叫做線程池(Thread pool)

        並發編程的一種編程方式是把任務拆分為一些列的小任務,即Runnable,然后在提交給一個Executor執行,Executor.execute(Runnalbe) 。Executor在執行時使用內部的線程池完成操作。             

         提交或者執行任務:

         1)execute(Runnable) 無返回值,無法判斷一個線程任務是否已經執行完畢

         2)submit(Runnable)  會返回一個Future,通過get()判斷是否執行完畢

         3)submit(Callable)    會返回一個result(自定義的返回值)

    在Executor里面。我們可以使用Callable,Future返回結果,Future<V>代表一個異步執行的操作,通過get()方法可以獲得操作的結果,如果異步操作還沒有完成,則,get()會使當前線程阻塞。FutureTask<V>實現了Future<V>和Runable<V>。Callable代表一個有返回值得操作。

public class Task implements Callable<Integer> {

	@Override
	public Integer call() throws Exception {
		int sum=0;
		int begin=(int) (Math.random()*10);  //產生0-10的雙精度隨機數   
		for(int i=0;i<begin;i++)
		{
			sum+=i;
		}
		return sum;
	}

}

public class test {

/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(3, 10, 200, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>());
List<Future<Integer>> results = new ArrayList<Future<Integer>>();
for(int i=0;i<5;i++)
{
Task task=new Task();
Future<Integer> result = null;
result=myExecutor.submit(task);
results.add(result);
}

for (Future<Integer> f : results) {
try {
System.out.println(f.get());
} catch (Exception ex) {
// ex.printStackTrace();
f.cancel(true);
}
}
myExecutor.shutdown();
}

}

   結束任務:

  Shutdown : 中斷所有沒有正在執行的任務,等待當前正在執行的線程結束然后關閉

  ShutdowmNow:   遍歷線程池中的所有線程任務,然后中斷它們

上面並發執行的挺好的,但是有個問題。不同的線程執行有塊有慢,有的任務會提早執行完畢,因此為了利用這些提早執行完畢的線程,使用了一種工作竊取(work-stealing)算法。

  (3) ForkJoin並行框架  

    Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。是不是很像map/reduce。

  在一個任務內,首先檢查這個任務的大小,如果它比設定的任務閾值要大,就將這個任務分成兩份或者多份。然后再使用框架執行。如果要小就直接解決問題。

                   

   

     

     Fork/Join 模式有自己的適用范圍。如果一個應用能被分解成多個子任務,並且組合多個子任務的結果就能夠獲得最終的答案,那么這個應用就適合用 Fork/Join 模式來解決。ForkJoin是將一個問題遞歸分解成子問題,再將子問題並行運算后合並結果。

    讓我們通過一個簡單的需求來使用下Fork/Join框架,需求是:計算1+2+3+4的結果。

使用Fork/Join框架首先要考慮到的是如何分割任務,如果我們希望每個子任務最多執行兩個數的相加,那么我們設置分割的閾值是2,由於是4個數字相加,所以Fork/Join框架會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務二負責計算3+4,然后再join兩個子任務的結果。

因為是有結果的任務,所以必須繼承RecursiveTask。

    我們只需要關注子任務的划分和中間結果的組合。ForkJoinTask完成子任務的划分,然后將它提交給ForkJoinPool來完成應用。

 

    如果大家有學習集成學習,那么使用Fork/Join來處理對大規模數據的投票是非常好的。比如:

    

 

    結果查看:可以從下面看到隨着分類器個數的增加,使用Fork/Join所提升的時間也是線性增加的。

 

    關於Fork/Join的查考:    http://blog.csdn.net/lubeijing2008xu/article/details/18036931

                                      http://www.oracle.com/technetwork/cn/articles/java/fork-join-422606-zhs.html

                                      http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/

                                      http://ifeve.com/java-concurrency-thread/

 

                                         

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM