java學習:線程池和異步


1.異步和同步

同步執行很容易理解,代碼的操作順序就是程序執行的順序。但是實際使用中,很多場景常常會受限於同步執行,不能充分利用cpu的資源,例如,要查找一大批數據中的最大數,同步執行時,可能是花費10單位的時間讀取數據,1單位的時間進行計算,總計在11單位時間后得到結果;而,異步執行時,分派10個線程執行任務,將會花費1單位的時間讀取數據,1單位時間進行計算,總計在2單位時間后得到結果。

相對於同步而言,異步本質上是申請線程,提高cpu的利用率(單核cpu執行計算密集型任務時會降低)更快地得到結果。在解決問題時合理地選擇同步和異步能更好地利用好計算資源。

從數據的角度來看,在同步操作中,數據的變化都保持一定的先后順序關系,不會出現沖突的情況;而在異步操作中,數據隨時可能被其中某個線程更改,這時需要注意數據的一致性,尤其是寫操作,要保證事務性,這里可以對數據加鎖來實現。另外有時線程之間也需要保證一定的順序,需要使用線程鎖(這里有的可以通過編碼技巧或者回調方法解決)。

 

2.線程池

在面向對象編程中,創建和銷毀對象是很費時間的,因為創建一個對象要獲取內存資源或者其它更多資源。在Java中更是如此,虛擬機將試圖跟蹤每一個對象,以便能夠在對象銷毀后進行垃圾回收。所以提高服務程序效率的一個手段就是盡可能減少創建和銷毀對象的次數,特別是一些很耗資源的對象創建和銷毀。如何利用已有對象來服務就是一個需要解決的關鍵問題,其實這就是一些"池化資源"技術產生的原因。比如大家所熟悉的數據庫連接池正是遵循這一思想而產生的,接下來將介紹的線程池技術同樣符合這一思想。

在java中,可以使用java.util.concurrent.ThreadPoolExecutor來創建線程池,

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

創建方法中的幾個參數需要注意:

先說最重要的3個參數,corePoolSize,maximumPoolSize,workQueue。corePoolSize設定核心線程數,maximumPoolSize設定最大線程數,workQueue設定等待隊列。當有新任務時,首先檢查當前的線程數是否小於corePoolSize,如果是,則新建線程處理任務;如果不是,再檢查當前workQueue是否已滿,如果未滿,則把新的任務加入workQueue,core線程完成時會從workQueue中取得任務繼續執行;如果workQueue已滿,再檢查當前線程數是否小於maximumPoolSize,如果是,則創建線程處理任務,如果不是,則拋出拒絕服務的異常(默認是拋出異常,具體如何處理是最后一個參數RejectExecutionHandler來決定的)。

其他的參數分別是,keepAliveTime、unit控制空閑線程的存活時間;threadFactory,創建新的線程時使用的創建者;handler,拒絕服務時的處理方式;(后兩個參數都有默認的選擇)

 

從線程池的工作方式可以看到,core,max,queue決定了線程池的表現,下面講述這三個參數的參考設定。

一般來說,需要處理的qps平均為n,最大為m,每個請求的處理時間為t(秒)時,core=nt+,max=mt+,queue視需要而定(當這些請求需要盡快響應,cpu資源常有空閑時,queue=0)

對於cpu密集型任務,如大量數據的計算,匹配,排序等,這時cpu的處理能力成為瓶頸,core和max要注意不要設定得太大,要衡量好cpu的處理能力。

對於io密集型任務,如操作數據庫,http請求等,core和max可以考慮設定得更大,因為線程通常處於等待之中,不會耗費多少cpu。

(20160329.add)對於cpu密集型任務:cpu為單核時,沒有必要使用多線程:單線程時,cpu資源主要都在計算上;多線程時,cpu還需要額外耗費線程之間切換的資源,降低了計算效率。cpu為多核時,有必要使用多線程(單線程時只有一個核在進行計算)

——總的來說,當cpu常有空閑的情況時,就應該考慮使用多線程了。

 

一些使用的例子:

1.簡單的一個線程

public class TestClass {
    /*/*/
    private SettableFuture<String> settableFuture = SettableFuture.create();
    public void todo(final String param) throws Exception{
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(100);
                    System.out.println("begin " + param);
                    Thread.sleep(1000);
                    System.out.println("finish sleep");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    settableFuture.set("complete " + param);
                }
            }
        });
        thread.start();
    }

    public SettableFuture<String> getSettableFuture() {
        return settableFuture;
    }

    public static void main(String[] args) throws Exception {
        TestClass testClass = new TestClass();
        testClass.todo("test");
        System.out.println("start todo");
        System.out.println(testClass.getSettableFuture().get());
    }
}

 

2.線程池&異步回調

public class TestClass {
    private ListenableFuture<String> listenableFuture;
    private ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

    public void todo(final String param){
        listenableFuture = listeningExecutorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("call " + param);
                Thread.sleep(100);
                return "call " + param + " complete";
            }
        });

        Futures.addCallback(listenableFuture, new FutureCallback<String>() {

            @Override
            public void onSuccess(String s) {
                try {
                    System.out.println(s + " success");
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("failed");
            }
        });
    public static void main(String[] args) throws InterruptedException {
        TestClass testClass = new TestClass();
        testClass.todo("test");
        System.out.println("ok");
    }
}

 

3.同時執行多個異步回調任務,等待所有任務結束后,輸出所有任務的執行結果

public class TestClass {
    private ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
    public void todo(final String param, final CountDownLatch countDownLatch, final List<String> result) throws InterruptedException {
            ListenableFuture listenableFuture = listeningExecutorService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(100);
                    System.out.println("exec " + param);
                    result.add(String.valueOf(param));
                    System.out.println("exec "+param+" finished");
                    return String.valueOf(param);
                }
            });
            Futures.addCallback(listenableFuture, new FutureCallback<String>() {
                @Override
                public void onSuccess(String s) {
                    System.out.println("success "+s);
                    countDownLatch.countDown();
                }

                @Override
                public void onFailure(Throwable throwable) {
                    System.out.println("failed");
                    countDownLatch.countDown();
                }
            });
        }
    public static void main(String[] args) throws InterruptedException {
        int taskSize = 4;
        TestClass testClass = new TestClass();
        final List<String> result = Lists.newArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(taskSize);

        for (int i = 0; i < taskSize; i++) {
            testClass.todo("test" + i, countDownLatch, result);
        }

        System.out.println("add task finished");
        countDownLatch.await(10, TimeUnit.SECONDS);
        System.out.println(result);
        testClass.listeningExecutorService.shutdown();
    }
    //*/
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM