完整的后端開發流程-深入淺出Java線程池:使用篇


 

深入淺出Java線程池:使用篇

完整的后端開發流程

 

手動步驟走一種完整的后端開發流程

 

服務端

 

1、將遠程倉庫的jar包 拷貝 到本地倉庫

2、將項目代碼 拷貝 到本地 並建立路徑 能夠執行編譯

3、編譯打包項目(package)至項目下,項目跑起來后進行本地測試

4、版本穩定后,上測試環境

 

上測試環境

1、將遠程倉庫的jar包 拷貝 到測試環境

2、將本地的項目代碼 上傳 到測試環境 pom能建立路徑 執行mvn腳本進行編譯打包

3、編譯打包項目(package)至項目下,項目跑起來后進行測試

4、版本在測試環境穩定后,install至本地倉庫,在上傳至遠程倉庫

5、不推薦嫌麻煩直接上傳本地jar包的方式,因為這樣無法發現由於環境造成的錯誤而且傳輸速度沒有直接編譯的快

 

客戶端聯調

 

1、將遠程倉庫的jar包(包括剛剛上傳的服務端jar) 拷貝 到本地倉庫

2、將項目代碼 拷貝 到本地 並建立路徑 能夠執行編譯

3、編譯打包項目(package)至項目下,項目跑起來后進行本地測試

4、項目注冊至RPC服務中來訪問跑在測試環境的服務端項目

5、版本穩定后,上測試環境聯調。

 

 

 

團隊的技術棧,基於這個背景再展開后面將提到的幾個問題,將會有更深刻的體會。

控制層基於SpringMvc,數據持久層基於JdbcTemplate自己封裝了一套類MyBatis的Dao框架,視圖層基於Velocity模板技術,其余組件基於SpringCloud全家桶。

 

問題1

某應用發布以后開始報數據庫連接池不夠用異常,日志如下:

1
com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 60000, active 500, maxActive 500, creating 0   

很明顯這是數據庫連接池滿了,當時處於業務低峰期,所以很顯然並不是由於流量突發造成的,另一種可能性是長事務導致,一般是事務中摻雜了外部網絡調用,最終跟業務負責人一起排除了長事務的可能性。

 

還有什么可能呢?我隨即想到了是不是沒有釋放連接導致,我跟業務負責人說了這個想法,他說這種可能性不大,連接的獲取和釋放都是由框架完成的,如果這塊有問題早反映出來了,我想也是。

 框架的確給我們帶來了很大的便利性,將業務中一些重復性的工作下沉到框架中,提高了研發效率,不誇張的說有些人脫離了Spring,MyBatis,SpringMvc這些框架,都不會寫代碼了。

 

那會是什么原因呢?我又冒出來一個想法,有沒有可能是某些功能框架支持不了,所以開發繞過了框架自己實現,進而導致連接沒有釋放,我跟業務負責人說了這個想法以后,他說:“這個有可能,這次有個功能需要獲取到數據庫名,所以自己通過Connection對象獲取的”,說到這兒答案大概已經出來了,一起看下這段代碼:

public String getSchema(String tablename, boolean cached) throws Exception {
    return this.getJdbcTemplate(tablename).getDataSource().getConnection().getCatalog();
}

代碼很簡單通過JdbcTemplate獲取DataSource,再通過DataSource獲取Connection,最終通過Connection獲取數據庫名,就是這一行簡單的代碼將數據庫連接耗盡,因為這里並沒有釋放連接的動作,之前的為什么都沒有問題呢,因為普通的查詢都是委派給JdbcTemplate來實現的,它內部會釋放連接,找一個簡單的query方法看下:

復制代碼
public <T> T query(PreparedStatementCreator psc, @Nullable final PreparedStatementSetter pss, final ResultSetExtractor<T> rse) throws DataAccessException {
        Assert.notNull(rse, "ResultSetExtractor must not be null");
        this.logger.debug("Executing prepared SQL query");
        return this.execute(psc, new PreparedStatementCallback<T>() {
            @Nullable
            public T doInPreparedStatement(PreparedStatement ps) throws SQLException {
                ResultSet rs = null;
​
                Object var3;
                try {
                    if (pss != null) {
                        pss.setValues(ps);
                    }
​
                    rs = ps.executeQuery();
                    var3 = rse.extractData(rs);
                } finally {
                    JdbcUtils.closeResultSet(rs);
                    if (pss instanceof ParameterDisposer) {
                        ((ParameterDisposer)pss).cleanupParameters();
                    }
​
                }
​
                return var3;
            }
        });
    }

復制代碼
復制代碼
    public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action) throws DataAccessException {
        Assert.notNull(psc, "PreparedStatementCreator must not be null");
        Assert.notNull(action, "Callback object must not be null");
        if (this.logger.isDebugEnabled()) {
            String sql = getSql(psc);
            this.logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
        }
​
        Connection con = DataSourceUtils.getConnection(this.obtainDataSource());
        PreparedStatement ps = null;
​
        Object var13;
        try {
            ps = psc.createPreparedStatement(con);
            this.applyStatementSettings(ps);
            T result = action.doInPreparedStatement(ps);
            this.handleWarnings((Statement)ps);
            var13 = result;
        } catch (SQLException var10) {
            if (psc instanceof ParameterDisposer) {
                ((ParameterDisposer)psc).cleanupParameters();
            }
​
            String sql = getSql(psc);
            psc = null;
            JdbcUtils.closeStatement(ps);
            ps = null;
            DataSourceUtils.releaseConnection(con, this.getDataSource());
            con = null;
            throw this.translateException("PreparedStatementCallback", sql, var10);
        } finally {
            if (psc instanceof ParameterDisposer) {
                ((ParameterDisposer)psc).cleanupParameters();
            }
​
            JdbcUtils.closeStatement(ps);
            DataSourceUtils.releaseConnection(con, this.getDataSource());
        }
​
        return var13;
    }
​
復制代碼

query方法基於execute這個模板方法實現,在execute內部會通過finally來確保連接的釋放

DataSourceUtils.releaseConnection,所以不會有連接耗盡的問題,問題已經很清晰了,改造也很簡單,大概有幾下幾種方法:

1.顯示的關閉連接,這里可以借助jdk的try resource語句,簡單明了。

 public String getSchema(String tablename, boolean cached) throws Exception {
      try(Connection connection = this.getJdbcTemplate(tablename).getDataSource().getConnection()){
                return connection.getCatalog();
      }        
}    

2.借助於JdbcTemplate的模板方法設計思想來解決,它提供了一個execute方法,用戶只要實現ConnectionCallback這個接口就可以獲取到Connection對象,在內部執行獲取數據庫名的邏輯,最終關閉連接由finally完成。

復制代碼
/**
   * Execute a JDBC data access operation, implemented as callback action
   * working on a JDBC Connection. This allows for implementing arbitrary
   * data access operations, within Spring's managed JDBC environment:
   * that is, participating in Spring-managed transactions and converting
   * JDBC SQLExceptions into Spring's DataAccessException hierarchy.
   * <p>The callback action can return a result object, for example a domain
   * object or a collection of domain objects.
   * @param action a callback object that specifies the action
   * @return a result object returned by the action, or {@code null} if none
   * @throws DataAccessException if there is any problem
   */
@Nullable
public <T> T execute(ConnectionCallback<T> action) throws DataAccessException {
    Assert.notNull(action, "Callback object must not be null");
    Connection con = DataSourceUtils.getConnection(this.obtainDataSource());
​
    Object var10;
    try {
        Connection conToUse = this.createConnectionProxy(con);
        var10 = action.doInConnection(conToUse);
    } catch (SQLException var8) {
        String sql = getSql(action);
        DataSourceUtils.releaseConnection(con, this.getDataSource());
        con = null;
        throw this.translateException("ConnectionCallback", sql, var8);
    } finally {
        DataSourceUtils.releaseConnection(con, this.getDataSource());
    }
​
        return var10;
 }
復制代碼
復制代碼
jdbcTemplate.execute(new ConnectionCallback<Object>() {
      @Override
      public Object doInConnection(Connection connection) throws SQLException, DataAccessException {
          return connection.getCatalog();
      }
});
復制代碼

雖然兩種都能解決問題,但我還是更推崇第二種方式,因為這種更貼合框架的設計思想,將一些重復性的邏輯繼續交給框架去實現,這里也體現出框架很重要的一個特點,就是對使用者提供擴展。

 

問題2

前幾天寫了一個Spring AOP的攔截功能,發現怎么也進不到攔截邏輯中,表達式確定沒問題,讓我百思不得其解,最終冷靜下來逐步排錯。

 

第一個很明顯的錯誤是被攔截的對象並沒有納入Spring管理,所以當即把對象交由Spring管理,問題依然沒有解決,我開始回想代理的原理。

Spring代理提供了兩種實現方式,一種是jdk的動態代理,另一種是cglib代理,這兩種方式分別適用於代理類實現了接口和代理類未實現接口的情況,其內部思想都是基於某種規約(接口或者父類)來生成一個Proxy對象,在Proxy對象方法調用時先調用InvocationHandler的invoke方法,在invoke方法內部先執行代理邏輯,再執行被代理對象的真實邏輯,這里貼一段jdk動態代理生成的Proxy對象的源文件供大家閱讀:

復制代碼
public class ProxyTest {
   /**
  定義目標接口,內部包含一個hello方法(這其實就是一個規約)
  */
    public interface ProxyT{
        void hello();
    }
​
    /**
    實現類,實現了ProxyT接口
    */
    public static class ProxyTImpl implements ProxyT{
        @Override
        public void hello() {
            System.out.println("aaaa");
        }
    }
​
    public static void main(String[] args) {
        //設置生成Proxy對象的源文件
        System.getProperties().put("sun.misc.ProxyGenerator.saveGeneratedFiles", "true");
​
        ProxyT proxyT1 = (ProxyT)Proxy.newProxyInstance(ProxyT.class.getClassLoader(),new Class[]{ProxyT.class}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                System.out.println("invoke");
                return method.invoke(proxyT,args);
            }
        });
​
        proxyT1.hello();
    }
}
復制代碼

最終生成的Proxy源文件如下:

復制代碼
package com.sun.proxy;
​
import coding4fun.ProxyTest.ProxyT;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
​
/**
生成的proxy源碼,繼承jdk的Proxy類,實現了ProxyT接口
(這里其實也解釋了為什么jdk的動態代理只能基於接口實現,不能基於父類,因為Proxy
必須繼承jdk的Proxy,而java又是單繼承,所以Proxy只能基於接口這個規約來生成)
*/
public final class $Proxy0 extends Proxy implements ProxyT {
    private static Method m1;
    private static Method m3;
    private static Method m2;
    private static Method m0;
​
    public $Proxy0(InvocationHandler var1) throws  {
        super(var1);
    }
​
    public final boolean equals(Object var1) throws  {
        try {
            return (Boolean)super.h.invoke(this, m1, new Object[]{var1});
        } catch (RuntimeException | Error var3) {
            throw var3;
        } catch (Throwable var4) {
            throw new UndeclaredThrowableException(var4);
        }
    }
​
    //hello方法將調用權交給了InvocationHandler
    public final void hello() throws  {
        try {
            super.h.invoke(this, m3, (Object[])null);
        } catch (RuntimeException | Error var2) {
            throw var2;
        } catch (Throwable var3) {
            throw new UndeclaredThrowableException(var3);
        }
    }
​
    public final String toString() throws  {
        try {
            return (String)super.h.invoke(this, m2, (Object[])null);
        } catch (RuntimeException | Error var2) {
            throw var2;
        } catch (Throwable var3) {
            throw new UndeclaredThrowableException(var3);
        }
    }
​
    public final int hashCode() throws  {
        try {
            return (Integer)super.h.invoke(this, m0, (Object[])null);
        } catch (RuntimeException | Error var2) {
            throw var2;
        } catch (Throwable var3) {
            throw new UndeclaredThrowableException(var3);
        }
    }
​
    static {
        try {
            m1 = Class.forName("java.lang.Object").getMethod("equals", Class.forName("java.lang.Object"));
            m3 = Class.forName("coding4fun.ProxyTest$ProxyT").getMethod("hello");
            m2 = Class.forName("java.lang.Object").getMethod("toString");
            m0 = Class.forName("java.lang.Object").getMethod("hashCode");
        } catch (NoSuchMethodException var2) {
            throw new NoSuchMethodError(var2.getMessage());
        } catch (ClassNotFoundException var3) {
            throw new NoClassDefFoundError(var3.getMessage());
        }
    }
}
復制代碼

到這里其實我已經有了答案,是我給Spring的規約(接口或者父類)出現了問題,首先我要代理的類並沒有實現接口,所以這里的規約不是接口,而是我這個類本身,從cglib的原理來講,它是將要代理的類作為父類來生成一個Proxy類,重寫要代理的方法,進而添加代理邏輯,問題就在於我那個類的方法是static的,而static方法是沒法重寫的,所以導致一直沒有進攔截邏輯,將static方法改為實例方法就解決了問題,這里貼一段cglib動態代理生成的Proxy對象的源文件供大家閱讀:

復制代碼
public class cglibtest {
    //定義被代理的類ProxyT,內部有一個hello方法
    public static class ProxyT{
        public void hello() {
            System.out.println("aaaa");
        }
    }
​
    //定義一個方法攔截器,和jdk的InvocationHandler類似
    public static class Interceptor implements MethodInterceptor {
        @Override
        public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
            //簡單的打印
            System.out.println("before invoke hello");
            //執行被代理類的方法(hello)
            return methodProxy.invokeSuper(o,objects);
        }
    }
​
    public static void main(String[] args) {
        // 設置CGLib代理類的生成位置
        System.setProperty(DebuggingClassWriter.DEBUG_LOCATION_PROPERTY, "./cg");
        // 設置JDK代理類的輸出
        System.getProperties().put("sun.misc.ProxyGenerator.saveGeneratedFiles", "true");
​
        MethodInterceptor methodInterceptor = new Interceptor();
​
        Enhancer enhancer = new Enhancer();
        //設置父類
        enhancer.setSuperclass(ProxyT.class);
        //設置方法回調
        enhancer.setCallback(methodInterceptor);
​
        ProxyT proxy = (ProxyT)enhancer.create();
        proxy.hello();
    }
}
​
復制代碼

最終生成的Proxy源文件如下(刪除了部分代碼,只保留了重寫hello方法邏輯):

復制代碼
//繼承ProxyT
public class cglibtest$ProxyT$$EnhancerByCGLIB$$8b3109a3 extends ProxyT implements Factory {
   final void CGLIB$hello$0() {
        super.hello();
    }
​
    //重寫hello方法
    public final void hello() {
        //方法攔截器
        MethodInterceptor var10000 = this.CGLIB$CALLBACK_0;
        if (var10000 == null) {
            CGLIB$BIND_CALLBACKS(this);
            var10000 = this.CGLIB$CALLBACK_0;
        }
​
        if (var10000 != null) {
            //執行方法攔截器
            var10000.intercept(this, CGLIB$hello$0$Method, CGLIB$emptyArgs, CGLIB$hello$0$Proxy);
        } else {
            super.hello();
        }
    }
}
復制代碼

總結

前面描述了筆者近期工作中遇到的兩個問題,不能說多么有難度,但是我相信應該有不少人都碰到過,不知道你是怎么解決的呢?解決了以后有沒有深挖其背后的原理呢,好多人說自己的工作都是簡單的crud沒有提高,那何不嘗試着深挖框架背后的原理,深挖那些看似普通但背后並不簡單的問題的本質。

 

框架雖好,但不要丟了其背后的原理。

 

<h3>深入淺出Java線程池:使用篇</h3>

 

深入淺出Java線程池:使用篇

 

前言

很高興遇見你~

借助於很多強大的框架,現在我們已經很少直接去管理線程,框架的內部都會為我們自動維護一個線程池。例如我們使用最多的okHttp以及他的封裝框架Retrofit,線程封裝框架RxJava和kotlin協程等等。為了更好地使用這些框架,則必須了解他的實現原理,而了解他的原理,線程池是永遠繞不開的話題。

線程的創建與切換的成本是比較昂貴的。JVM的線程實現使用的是輕量級進程,也就是一個線程對應一個cpu核心。因此在創建與切換線程時,則會涉及到系統調用,是開銷比較大的過程。為了解決這個問題,線程池誕生了。

與很多連接池,如sql連接池、http連接池的思想類似,線程池的出現是為了復用線程,減少創建和切換線程所帶來的開銷,同時可以更方便地管理線程。線程池的內部維護有一定數量的線程,這些線程就像一個個的“工人”,我們只需要向線程池提交任務,那么這些任務就會被自動分配到這些“工人”,也就是線程去執行。

線程池的好處:

  1. 減少資源損耗。重用線程、控制線程數量,減少線程創建和切換所帶來的開銷。
  2. 提高響應速度。可直接使用線程池中空閑的線程而不必等待線程的創建。
  3. 方便管理線程。線程池可以對其中的線程進行簡單的管理,如實時監控數據調整參數、設置定時任務等。

這個系列文章主要分兩篇:使用篇與原理篇。作為使用篇,顧名思義,主要是了解什么是線程池以及如何正確去使用它。

線程池的主要實現類有兩個:ThreadPoolExecutor和ScheduledThreadPoolExecutor,后者繼承了前者。線程池的配置參數比較多,系統也預設了一些常用的線程池,放在Executors中,開發者只需要配置簡單的參數就可以。線程池的可執行任務有兩種對象:Runnable和Callable,這兩種對象可以直接被線程池執行,以及他們的異步返回對象Future。

那么以上講到的,就是本文要討論的內容。首先,從線程池的可執行任務開始。

任務類型

Runnable

public interface Runnable { public abstract void run(); } 

Runnable我們是比較熟悉的了。他是一個接口,內部只有一個方法 run ,沒有參數,沒有返回值。當我們提交一個Runnable對象給線程池執行時,他的 run 方法會被執行。

Callable

public interface Callable<V> { V call() throws Exception; } 

與Runnable很類似,最大的不同就是他擁有返回值,而且會拋出異常。任務對象適合用於需要等待一個返回值的后台計算任務。

Future

public interface Future<V> { // 取消任務 boolean cancel(boolean mayInterruptIfRunning); // 任務是否被取消 boolean isCancelled(); // 任務是否已經完成 boolean isDone(); // 獲取返回值 V get() throws InterruptedException, ExecutionException; // 在一定的時間內等待返回值 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } 

Futrue他並不是一個任務,而是一個計算結果異步返回的管理對象。前面的Callable任務提交給線程池之后,他需要一定的計算時間才能將結果返回,所以線程池會返回一個Future對象。我們可以調用他的 isDone 方法來檢測是否完成,如果完成可以調用 get 方法來獲取結果。同時也可以通過 cancel 和 isCancel 來取消或者判斷任務是否被取消。如下圖:

  1. 當Future未被執行或正在執行中時,get方法會阻塞直到執行完成。如果不希望等待時間過長,可以調用它另外一個帶有時間參數的方法,該方法等待指定時間之后,如果任務尚未完成則會拋出TimeoutException異常。
  2. 當Future執行完成之后,get方法會返回結果;而此時如果任務是被取消,那么會拋出異常。
  3. 當一個任務尚未被執行時,cancel方法會讓該任務不會被執行,直接結束。
  4. 當任務正在執行,cancel方法的參數如果為false,那么不會中斷任務;而如果參數是true則會嘗試中斷任務。
  5. 當任務已完成,取消方法返回false,取消失敗。

Futrue接口的具體實現類是FutureTask,該類同時實現了Runnable接口,可以被線程池直接執行。我們可以通過傳入一個Callable對象來創建一個FutureTask。如果是Runnable對象,則可以通過 Executors.callable() 來構造一個Callable對象,只不過這個Callable對象返回null,所構造出來的Future對象get方法在成功時也會返回null。當FutureTask的 run 方法被執行后,其所包含的任務開始執行。

當然,我們也可以單獨使用FutureTask,如下:

// 創建一個FutureTask FutureTask<String> future = new FutureTask<>(() -> { Thread.sleep(1000); return "一只修仙的猿"; }); // 使用一個后台線程來執行他的run方法 new Thread(future).start(); // 執行結束之后可以通過他的get方法得到返回值 System.out.println(future.get()); 

當然,如果我們要這樣使用的話,那為什么不用線程池呢?(手動狗頭)

接下來介紹線程池的兩個主要類:ThreadPoolExecutor和ScheduledThreadPoolExecutor。

ThreadPoolExecutor

概述

我們常說的線程池,很大程度上指的就是ThreadPoolExecutor,他也是我們使用最為頻繁的線程池。ThreadPoolExecutor的內部核心角色有三個:等待隊列、線程和拒絕策略者:

線程池的內部很像一個工廠,等待隊列就如同一個流水線,我們的任務就放在里面。 線程分為核心線程和非核心線程,他們就如同一個個的 “工人” ,從等待隊列中獲取任務進行執行。而如果這個“工廠”已經無法接受更多的任務,那么這個任務就會交給拒絕策略者去處理。

這里要特別注意的是,圖中我分為兩種線程是為了方便理解,而在實際中, 線程本身並沒有核心與非核心之分 ,只有線程數大於核心線程數與小於核心線程數之分 。

例如核心線程數限制為3,但現在有4個線程正在工作:甲乙丙丁。當甲先執行完成時進入空閑狀態時,因為總數超出設定的3,那么甲會被認為非核心線程被關閉。同理,如果丁先執行完成任務,則是丁被認為非核心線程被關閉。


ThreadPoolExecutor並不是把每個新任務都直接放到等待隊列中,而是有一套既定的規則來執行每個新任務:

  • 情況1:在線程數沒有達到核心線程數時,每個新任務都會創建一個新的線程來執行任務。
  • 情況2:當線程數達到核心線程數時,每個新任務會被放入到等待隊列中等待被執行。
  • 情況3:當等待隊列已經滿了之后,如果線程數沒有到達總的線程數上限,那么會創建一個非核心線程來執行任務。
  • 情況4:當線程數已經到達總的線程數限制時,新的任務會被拒絕策略者處理,線程池無法執行該任務。

這里我們可以發現對於線程池中的角色,有各種各樣的數量上限,具體的有以下參數:

  • 核心線程數corePoolSize:指定核心線程的數量上限;當線程數小於核心線程數,那么線程是不會被銷毀的,除非通設置線程池的 allowCoreThreadTimeOut 參數為true,那么核心線程在等待 keepAliveTime 時間之后,就會被銷毀。
  • 允許核心線程被回收allowCoreThreadTimeOut :是否允許核心線程被回收。
  • 最大線程數maximumPoolSize:指定線程總數的上限。線程總數=核心線層數+非核心線程數。當等待隊列滿了之后,新的任務會創建新的非核心線程來執行,直到線程數到達總數的上限。
  • 線程閑置時間keepAliveTime:線程空閑等待該時間后會被銷毀。非核心線程默認會被銷毀,而核心線程則需要開發者自己設定是否允許被銷毀。
  • 等待隊列BlockingQueue:存放任務的隊列。我們可以在創建隊列的時候設置隊列的長度。一般使用的隊列有LinkedBlockingQueue、ArrayBlockingQueue、SychronizeQueue、PriorityBlockingQueue等,前兩者是普通的阻塞隊列,最后一個是優先阻塞隊列,剩下的一個是沒有容量的隊列。
  • 拒絕策略者RejectExecutionHandler :線程池無法處理的任務就會交給他處理。

通過設置線程池的這些參數可以讓線程池擁有完全不同的特性,來適應不同的情景。通常我們在ThreadPoolExecutor的構造方法中,會傳入這些參數:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... } 

基本都是我們上面介紹過的參數,有兩個參數我們還沒見過:

  • 時間單位unit:給前面的keepAliveTime指定時間單位。
  • 線程工廠threadFactory: 線程池會通過線程工廠來創建新的線程,主要是給線程指定一個有意義的名字。

當然,我們也可以不用全部指定上面的參數,ThreadFactory和RejectedExecutionHandler不指定可以使用默認的參數。

配置ThreadPoolExecutor

經過概述,我們對ThreadPoolExecutor的參數以及內部結構已經非常清楚了,接下來探討一下如何合理地進行配置。

首先是核心線程數。核心線程太少,線程之間會互相爭奪cpu資源,多個線程之間進行時間片輪換,大量的線程切換工作會增大系統的開銷;核心線程太少,會導致一些cpu核心在掛起等待阻塞操作,沒法充分利用cpu資源;核心線程數的配置就是要均衡這兩個特點。

關於核心線程數的配置,有一個廣為人知的默認規則:

cpu密集型任務,設置為CPU核心數+1;
IO密集型任務,設置為CPU核心數*2;

深入淺出Java線程池:使用篇

CPU密集型任務指的是需要cpu進行大量計算的任務,這個時候我們需要盡可能地壓榨CPU的利用率。此時核心數不宜設置過大,太多的線程會互相搶占cpu資源導致不斷切換線程,反而浪費了cpu。最理想的情況是每個CPU都在進行計算,沒有浪費。但很有可能其中的一個線程會突然掛起等待IO,此時額外的一個等待線程就可以馬上進行工作,而不必等待掛起結束。

IO密集型任務指的是任務需要頻繁進行IO操作,這些操作會導致線程長時間處於掛起狀態,那么需要更多的線程來進行工作,不會讓cpu都處於掛起狀態,浪費資源。一般設置為cpu核心數的兩倍即可。

當然實際情況還需要根據任務具體特征來配置,例如系統資源有限,有一些線程被掛在后台持續工作,那么這個時候就必須適當減少線程數,從而減少線層切換的次數。


第二是阻塞隊列的配置。

阻塞隊列有4個內置的選擇:LinkedBlockingQueue、ArrayBlockingQueue、SychronizeQueue和PriorityBlockingQueue,其次還有比較少用的DelayedWorkQueue。

如果學習過Android的Handler機制的話,會發現他們跟Handler內部的MessageQueue是非常像的。ThreadPoolExecutor中的線程,相當於Handler的Looper;阻塞隊列,相當於Handler的MessageQueue。他們都會去隊列中獲取新的任務,當隊列為空時,queue.poll()方法會進行阻塞,直到下一個新的任務來臨。

LinkedBlockingQueue、ArrayBlockingQueue都是普通的阻塞隊列,尾插頭出;區別是后者在創建的時候必須指定長度。
SychronizeQueue是一個沒有容量的隊列,每一個插入到這個隊列的任務必須馬上找到可以執行的線程,如果沒有則拒絕執行。
PriorityBlockingQueue是具有優先級的阻塞隊列,里面的任務會根據設置的優先級進行排序。所以優先級低的任務可能會一直被優先級高的任務頂下去而得不到執行。
DelayedWorkQueue則非常像Handler中的MessageQueue了,可以給任務設置延時。

阻塞隊列的選擇也是非常重要,一般來說必須要指定阻塞隊列的長度。如果使用無限長的隊列,那么有可能在大量的任務到來時直接OOM,程序崩潰。


第三是線程總數。

在阻塞隊列滿了之后,如果線程總數還沒到達上限,會創建額外的線程來執行。這對應付突然的大量但輕量的任務的時候有奇效。通過創建更多的線程來提高並發效率。

但是同時也要注意,如果線程數量太多,會造成頻繁進行線程切換導致高昂的系統開銷。


第四是線程存活時間。

這里一般指非核心線程的存活時間。這個參數的意義在於,當線程都進入空閑的時候,可以回收部分線程來減少系統資源占用。為了提高線程池的響應速度,一般比較少去回收核心線程。


第五是線程工廠。

這個參數比較簡單,繼承接口然后重寫 newThread 方法即可。主要的用途在於給創建的線程設置一個有意義的名字。


最后一個是拒絕策略者。

ThreadPoolExecutor內置有4種策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。默認是使用AbortPolicy,我們可以在構造方法中傳入這些類的實例,在任務被拒絕的時候,會回調他們的 rejectedExecution 方法。

  • AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
  • DiscardPolicy:也是丟棄任務,但是不拋出異常。
  • DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
  • CallerRunsPolicy:由調用線程直接執行該任務

當然我們也可以自己實現RejectedExecutionHandler接口來自定義自己的拒絕策略。

線程池的使用

  • execute(Runnable command) :執行一個任務
  • Future submit(Runnable/Callable):提交一個任務,這個任務擁有返回值
  • shutDown/shutDownNow:關閉線程池。后者還有去嘗試中斷正在執行的任務,也就是調用正在執行的線程的interrupt方法
  • preStartAllCoreThread:提前啟動所有的線程
  • isShutDown:線程池是否被關閉
  • isTerminad:線程池是否被終止。區別於shutDown,這個狀態下的線程池沒有正在執行的任務

這些都是比較常用的api,更多的api讀者可以去閱讀api文檔。

監控線程池

ThreadPoolExecutor中有很多參數可以提供我們參考線程池的運行情況:

  • largestPoolSize:線程池中曾經到達的最大線程數量
  • completedTaskCount:完成的任務個數
  • getAliveCount:獲取當前正在執行任務的線程數
  • getTaskCount:獲取當前任務個數
  • getPoolSize:獲取當前線程數

此外還有很多的get方法來獲取相關參數,提供動態的線程池運行情況監控,感興趣的讀者可以去閱讀相關的api。

ThreadPoolExecutor中還有提供了一些的回調方法:

  • beforeExecute:在任務執行前被調用
  • afterExecute:在任務執行之后被調用
  • terminated:在線程池終止之后被調用

我們可以通過繼承ThreadPoolExecutor來重寫這些方法。

ScheduledThreadPoolExecutor

概述

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,整體的內部結構和ThreadPoolExecutor是一樣的。有一個重點的不同就是阻塞隊列使用的是DelayedWorkQueue。他可以根據我們設置的時間延遲來對任務進行排序,讓任務按照時間順序進行執行,和MessageQueue非常像。

ScheduledThreadPoolExecutor實現了ScheduledExecutorService接口,有一系列可以設置延時任務、周期任務的api。

定時周期任務我們會想到Timer這個框架。這里簡單做個對比:

  • Timer是系統時間敏感的,他會根據系統時間來觸發任務
  • Timer內部只有一個線程,可能會造成執行任務阻塞無法按時執行;而ScheduledThreadPoolExecutor可以創建多個線程來執行任務
  • Timer遇到異常時會直接拋出,線層終止;而ScheduledThreadPoolExecutor有一個更加完善的異常處理機制。

參數配置

先看到ShcduledhreadPoolExecutor的構造器:

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory, handler); } 

ScheduledThreadPoolExecutor有幾個構造器,參數數量最多的是上面這個。父類是ThreadPoolExecutor,所以這里是直接調用到ThreadPoolExecutor的構造器:

  • 核心線程數:這個由我們自己傳入參數設定
  • 線程數上限:這個設置Integer.MAX_VALUE,可以認為是沒有上限
  • keepAliveTime:10毫秒,基本上只要線程進入空閑狀態馬上就會被回收
  • 阻塞隊列:DelayedWorkQueue在上面介紹過了,可以設置延遲時間的阻塞隊列
  • 線程工廠和拒絕策略由開發者自定義

可以看到ShceduledThreadPoolExecutor的配置還是比較簡單的,重點在於他的延時阻塞隊列可以設置延時任務;keepAliveTime時間的設置讓空閑的線程馬上就會被回收。

線程池的使用

ScheduledThreadPoolExecutor有ThreadPoolExecutor的接口,同時還包含了一些定時任務的接口:

  • schedule:傳入一個任務和延遲的時間,在延遲時間之后開始執行任務
  • scheduleAtFixedRate:設置固定時間點指定任務。傳入兩個時間,第一次執行在延遲初始化的時間后;之后每隔指定時間執行一次。
  • scheduledWithFixedDelay:在初始延遲之后,每次執行之后延遲指定時間再次執行。

重點就是上面的三個方法的使用,其他的和ThreadPoolExecutor類似。

Executors

線程池配置的參數很多,框架內置了一些已經配置好參數的線程池,如果懶得去配置參數,可直接使用內置的線程池,可以使用Executors.newXXX方法來構建。

ThreadPoolExecutor有三個配置好參數的線程池:FixedTthreadPool、CacheThreadPool、SingleThreadExecutor。ScheduledThreadPoolExecutor有兩個配置好參數的線程池:ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor。

我們看一下這些線程池:

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } 

這類線程池只有核心線程,數量固定且不會被回收,等待隊列的長度沒有上限。

這個線程池的特點就是線程數量固定,適用於負載較重的服務器,可以通過這種線程池來限制線程數量;線程不會被回收,也有比較高的響應速度。

但是等待隊列沒有上限,在任務過多時有可能發生OOM。

CacheThreadPool

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } 

這類線程池沒有核心線程,而且線程數量上限為Integer.MAX_VALUE,可以認為沒有上限。等待隊列沒有長度,每一個任務到來都會分配一個線程來執行。

這類線程池的特點就是每個任務都會被馬上執行,在任務數量過大時可能會創建大量的線程導致系統OOM。但是在一些任務數量多但執行時間短的情景下比較適用。

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } 

只有一個線程的線程池,等待隊列沒有上限,每個任務都會按照順序被執行。適用於對任務執行順序有嚴格要求的場景。

ScheduledThreadPoolExecutor

public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } 

和ScheduledThreadPoolExecutor原生默認的構造器差別不大。

SingleThreadScheduledPoolExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); } 

控制線程只有一個。每個任務會按照順序先后被執行。

小結

可以看到,Executors只是按照一些比較大的情景方向,對線程池的參數進行簡單的配置。那可能會問:那我直接使用線程池的構造器自己設置不就完事了?

確實,阿里巴巴開發者手冊也是這樣建議的。盡量使用原生的構造器來創建線程池對象,這樣我們可以根據實際的情況配置出更加規范的線程池。Executors中的線程池在一些極端的情況下都可能會發生OOM,那么我們自己配置線程池時就要盡量避免這個問題。

最后

關於線程池的使用總體上就介紹到這里,線程池有非常多的優點,希望下次需要創建線程的時候,不會只記得 new Thread 。

下一篇將深入線程池的內部實現原理,如果了解過Android的Handler機制會發現兩者的設計幾乎一模一樣,也是非常有趣的。

希望文章對你有幫助。

參考文獻

  • 《Java並發編程的藝術》:並發編程必讀,作者對一些原理講的很透徹
  • 《Java核心技術卷》:這系列的書主要是講解框架的使用,不會深入原理,適合入門
  • javaGuide:javaGuide,對java知識總結得很不錯的一個博客
  • Java並發編程:線程池的使用:博客園上一位很優秀的博主,文章寫得通俗易懂且不失深度

全文到此,原創不易,覺得有幫助可以點贊收藏評論轉發。
筆者才疏學淺,有任何想法歡迎評論區交流指正。
如需轉載請評論區或私信交流。

另外歡迎光臨筆者的個人博客:傳送門

 
分類:  Java


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM