java后台異步任務執行器TaskManager


java后台異步任務執行器TaskManager

此方式基於MVC方式:

 

 

一,使用任務:

 1 @Resource
 2 private TaskManager taskManager;
 3 
 4 public string  commit(TradeStatus status) {
 5     if (status== TradeStatus.UNDERWAY) {
 6 
 7     // 執行任務
 8     taskManager.addTask(new Runnable() {
 9 
10         @Override
11         public void run() {
12             handleUnderway(status); //運行業程序
13         }
14     });
15 
16     } else {
17                 
18     }
19 
20 }
 /** * 返回業務處理狀態 * * * <p> * 此方法需要被異步調用 * </p> * */ public void handleUnderway(TradeSatus satus) { int waitTimeOut = 20*1000try { if (logger.isDebugEnabled()) { logger.debug("提交查詢支付狀態"); } Assert.notNull(trade, "交易對象不能為空"); int timeOut = 0; String result = null; while (true) { result = queryStatus(satus); //獲取狀態 if (result == TradeStatus.UNDERWAY) { try { if (timeOut > waitTimeOut) { logger.warn(String.format("查詢狀態結果超時.")); break; } timeOut += 5000; Thread.sleep(5000); // 5秒同步一次 } catch (InterruptedException e) { if (logger.isInfoEnabled()) { logger.info("等待支付結果被中斷"); } break; } } else { break;// 查到銀行結果  } } if (result != TradeStatus.UNDERWAY) { //處理結果  } if (logger.isDebugEnabled()) { logger.debug(String.format("查詢完成,查詢結果為:%s", result)); } } catch (Exception e) { logger.error(String.format("查詢支付結果異常"), e); throw new SimpleException("查詢支付結果異常", e); } } 

 

二:實現的service

public interface TaskManager {

    /**
     * @param task
     */
    void addTask(Runnable task);

    /**
     * @return
     */
    int getActiveCount();
    
    /**
     * 停止任務管理器
     */
    void stop();

}

 

三:service的實現類

package com.service.impl;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.Resource;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.TaskExecutor;

import com.service.TaskManager;

/**
 * 后台異步任務執行器
 *
 */
public class TaskManagerImpl implements TaskManager, InitializingBean {

    private static Log logger = LogFactory.getLog(TaskManagerImpl.class);

    private ThreadPoolExecutor executer = null;

    private int corePoolSize = 5;

    private int maximumPoolSize = 50;

    private long keepAliveTime = 10;

    private TimeUnit unit = TimeUnit.MINUTES;//

    private BlockingQueue<Runnable> workQueue;

    private Thread t;
    
    @Resource
    private TaskExecutor taskExecutor; 

    @Override
    public void addTask(Runnable task) {

        taskExecutor.execute(task);
        
//        executer.execute(task);

    }

    @Override
    public int getActiveCount() {

        return 0;

    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
     */
    @Override
    public void afterPropertiesSet() throws Exception {

        if (executer == null) {
            
            workQueue = new LinkedBlockingQueue<Runnable>();

            executer = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy());
            
            
            t = new Thread(new Runnable() {
                
                @Override
                public void run() {

                    while (true) {
                        
                        try {
                            Thread.sleep(120 * 1000);
                        
                            if (logger.isDebugEnabled()) {
                                
                                String msg = String.format("隊列大小:%d, 激活任務數: %d, poolSize: %d, 任務數:%d, 已處理任務數:%d", 
                                         workQueue.size(), getActiveCount(), executer.getPoolSize(), executer.getTaskCount(), executer.getCompletedTaskCount());
                                
                                logger.debug(msg);
                                
                            }
                        
                        } catch (InterruptedException e) {
                            return;
                        }
                        
                        
                    }
                    
                }
            });
            t.setDaemon(true);
            t.setName("異步任務健康檢查線程");
            t.start();
            
        }
        
    }

    @Override
    public void stop() {

        if (executer == null)
            return;

        try {
            logger.info("准備停止任務管理器");
            executer.shutdown();
            executer.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {

            logger.info("任務被中止.");
        } finally {
            if (!executer.isTerminated()) {
                logger.info("取消未完成的任務.");
            }
            executer.shutdownNow();
            logger.info("任務管理器停止完成.");
        }
    }

    /**
     * @return the corePoolSize
     */
    public int getCorePoolSize() {
        return corePoolSize;
    }

    /**
     * @param corePoolSize
     *            the corePoolSize to set
     */
    public void setCorePoolSize(int corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    /**
     * @return the maximumPoolSize
     */
    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    /**
     * @param maximumPoolSize
     *            the maximumPoolSize to set
     */
    public void setMaximumPoolSize(int maximumPoolSize) {
        this.maximumPoolSize = maximumPoolSize;
    }

    /**
     * @return the keepAliveTime
     */
    public long getKeepAliveTime() {
        return keepAliveTime;
    }

    /**
     * @param keepAliveTime
     *            the keepAliveTime to set
     */
    public void setKeepAliveTime(long keepAliveTime) {
        this.keepAliveTime = keepAliveTime;
    }

    /**
     * @return the unit
     */
    public TimeUnit getUnit() {
        return unit;
    }

    /**
     * @param unit
     *            the unit to set
     */
    public void setUnit(TimeUnit unit) {
        this.unit = unit;
    }

}

 

四:xml 配置

       <!-- 異步任務執行器配置 -->
    <bean id="taskManager" class="com.service.impl.TaskManagerImpl">
        <property name="corePoolSize" value="30"></property>
        <property name="maximumPoolSize" value="100"></property>
        <property name="keepAliveTime" value="10"></property>
    </bean>
    
    <!--
        任務執行器  
        pool-size="5-20", 表示線程池活躍的線程數為5,最大線程數為20;
        queue-capacity="100"    表示隊列大小為100
    -->
    <task:executor id="taskExecutor" keep-alive="30" pool-size="5-20" queue-capacity="100" rejection-policy="ABORT"/>

以上異步處理就完成了。

 

下面是摘錄下來的一些解釋:


  五。配置解釋
當一個任務通過execute(Runnable)方法欲添加到線程池時:
1、 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
2、 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
3、如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
4、 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
5、 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。

六 其它線程池
JDK的ThreadPoolExecutor
一個 ExecutorService,它使用可能的幾個池線程之一執行每個提交的任務,通常使用Executors 工廠方法配置。
線程池可以解決兩個不同問題:由於減少了每個任務調用的開銷,它們通常可以在執行大量異步任務時提供增強的性能,並且還可以提供綁定和管理資源(包括執行 集合任務時使用的線程)的方法。每個ThreadPoolExecutor 還維護着一些基本的統計數據,如完成的任務數。
為了便於跨大量上下文使用,此類提供了很多可調整的參數和擴展掛鈎。但是,強烈建議程序員使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)、 Executors.newFixedThreadPool(int)(固定大小線程池)和 Executors.newSingleThreadExecutor()(單個后台線程),它們均為大多數使用場景預定義了設置。否則,在手動配置和調 整此類時,使用以下指導:
1、核心和最大池大小 
ThreadPoolExecutor 將根據 corePoolSize(參見 getCorePoolSize())和 maximumPoolSize(參見getMaximumPoolSize())設置的邊界自動調整池大小。當新任務在方法 execute(java.lang.Runnable) 中提交時,如果運行的線程少於 corePoolSize,則創建新線程來處理請求,即使其他輔助線程是空閑的。如果運行的線程多於 corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才創建新線程。如果設置的 corePoolSize 和 maximumPoolSize 相同,則創建了固定大小的線程池。如果將 maximumPoolSize 設置為基本的無界值(如Integer.MAX_VALUE),則允許池適應任意數量的並發任務。在大多數情況下,核心和最大池大小僅基於構造來設置,不 過也可以使用setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。
2、按需構造 
默認情況下,即使核心線程最初只是在新任務需要時才創建和啟動的,也可以使用方法 prestartCoreThread() 或prestartAllCoreThreads() 對其進行動態重寫。
3、創建新線程 
使用 ThreadFactory 創建新線程。如果沒有另外說明,則在同一個ThreadGroup 中一律使用Executors.defaultThreadFactory() 創建線程,並且這些線程具有相同的NORM_PRIORITY 優先級和非守護進程狀態。通過提供不同的 ThreadFactory,可以改變線程的名稱、線程組、優先級、守護進程狀態,等等。如果從newThread 返回 null 時ThreadFactory 未能創建線程,則執行程序將繼續運行,但不能執行任何任務。
4、保持活動時間 
如果池中當前有多於 corePoolSize 的線程,則這些多出的線程在空閑時間超過 keepAliveTime 時將會終止(參見 getKeepAliveTime(java.util.concurrent.TimeUnit))。這提供了當池處於非活動狀態時減少資源消耗的方 法。如果池后來變得更為活動,則可以創建新的線程。也可以使用方法setKeepAliveTime(long, java.util.concurrent.TimeUnit) 動態地更改此參數。使用Long.MAX_VALUETimeUnit.NANOSECONDS 的值在關閉前有效地從以前的終止狀態禁用空閑線程。
5、排隊 
所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此隊列與池大小進行交互:
如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。 
如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。 
如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。 
排隊有三種通用策略:
a、直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此 會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集合時出現鎖定。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
b、無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙的情況下將新任務加入隊列。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
c、有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以 最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。
6、被拒絕的任務 
當 Executor 已經關閉,並且 Executor 將有限邊界用於最大線程和工作隊列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被拒絕。在以上兩種情況下,execute 方法都將調用其RejectedExecutionHandler 的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預定義的處理程序策略:
在默認的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運行時RejectedExecutionException。
在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調用運行該任務的execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。
在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程)。
定義和使用其他種類的 RejectedExecutionHandler 類也是可能的,但這樣做需要非常小心,尤其是當策略僅用於特定容量或排隊策略時。
7、掛鈎方法 
此類提供 protected 可重寫的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,這兩種方法分別在執行每個任務之前和之后調用。它們可用於操縱執行環境;例如,重新初始化 ThreadLocal、搜集統計信息或添加日志條目。此外,還可以重寫方法terminated() 來執行 Executor 完全終止后需要完成的所有特殊處理。
如果掛鈎或回調方法拋出異常,則內部輔助線程將依次失敗並突然終止。 
8、隊列維護 
方法 getQueue() 允許出於監控和調試目的而訪問工作隊列。強烈反對出於其他任何目的而使用此方法。remove(java.lang.Runnable) 和purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行存儲回收。

 

偶遇晨光原創

2016-03-11


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM