異步任務和定時任務


異步任務和定時任務

異步任務(AOP)

在開發系統的過程中,通常會考慮到系統的性能問題,提升系統性能的一個重要思想就是“串行”改“並行”。說起“並行”自然離不開“異步”。

使用方式(共兩步)

第一步: 在springboot主 啟動類中假如一個@EnableAsync注解用於開啟異步任務, 如下

@EnableAsync // 開啟異步任務
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

第二步: 然后在需要異步操作的方法上加@Async注解,

如果此注解應用於類上, 即表明這個類中的所有方法在執行時都會異步執行

當在執行用此注解描述的方法的時候, 會開辟一個新的線程來執行這個方法

如下Service層代碼

@Service
@Async
public class UserServiceImpl implements UserService {

    @Resource
    private UserDao userDao;

    // 假設保存操作需要5秒鍾時間才能完成
    @Override
    public void update1() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        userDao.updateUser(new User());
    }

}

我們在controller中進行調用

@Controller
public class UserController {

    @Resource
    private UserService userService;

    @RequestMapping("/update")
    @ResponseBody
    public String update () {
        userService.update1();
        return "success";
    }

}

當我們訪問/update的url時, 頁面會瞬間顯示success

如果說service中沒有@Async注解, 即沒有開啟異步任務, 則需要等待5秒后, 頁面才能顯示success

當然, 我們在實際開發中, 不可能沒有保存成功就提示成功, 這是只是為了演示

實際應用場景

例如在我們記錄用戶的操作日志的時候, 不可能等日志記錄完成再實現正常的業務, 所以可以把機制的記錄改為異步操作即可

獲取異步任務結果

假如需要獲取業務層異步方法的執行結果,可參考如下代碼設計進行實現:

@Override
@Async
public Future<Integer> update1() {
    System.out.println("更新中...");
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    int row = userDao.updateUser(new User());
    return new AsyncResult<Integer>(row); // 使用AsyncResult封裝數據
}

其中,AsyncResult對象可以對異步方法的執行結果進行封裝,假如外界需要異步方法結果時,可以通過Future對象的get方法獲取結果。

原理及配置

對於@Async注解默認會基於ThreadPoolTaskExecutor對象獲取工作線程,然后調用由@Async描述的方法,讓方法運行於一個工作線程,以實現異步操作。但是假如系統中的默認拒絕處理策略,任務執行過程的異常處理不能滿足我們自身業務需求的話,我可以對異步線程池進行自定義.(SpringBoot中默認的異步配置可以參考自動配置對象TaskExecutionAutoConfiguration).

spring:
  task:
    execution:
      pool:
        core-size: 10 #核心線程數,當池中線程數沒達到core-size時,每來一個請求都創建一個新的線程
        queue-capacity: 256 #隊列容量,當核心線程都在忙,再來新的任務,會將任務放到隊列
        max-size: 128 #當核心線程都在忙,隊列也滿了,再來新的任務,此時會創建新的線程,直到達到maxSize
        keep-alive: 60s #(加s為秒, 不加為毫秒)當任務高峰過后,有些線程會空閑下來,這空閑現線程達到一定的時間會被釋放。
        allow-core-thread-timeout: false # 是否允許核心線程超時
      thread-name-prefix: service-task- # 線程名稱前綴

對於spring框架中線程池配置參數的涵義,可以參考ThreadPoolExecutor對象中的解釋。

定時任務

項目開發中經常需要執行一些定時任務,比如需要在每天凌晨的時候,分析一次前一天的日志信息,Spring為我們提供了異步執行任務調度的方式,提供了兩個接口。

  • TaskExecutor接口
  • TaskScheduler接口

兩個注解:

  • @EnableScheduling 開啟定時任務
  • @Scheduled 添加定時任務, 參數為cron表達式

cron表達式:

在線生成cron表達式: http://www.bejson.com/othertools/cron/

字段 允許值 允許的特殊字符
0~59 , - * /
0~59 , - * /
小時 0~23 , - * /
日期 1~31 , - * ? / L W C
月份 1~12 , - * /
星期 1~7或SUN-SAT(1=SUN) , - * ? / L W C
年(可選) 1970~2099 , - * /

特殊字符:

特殊字符 含義
, (逗號) 枚舉
-(減號) 區間
*(星號) 任意
/(左斜杠) 步長
?(問好) 日/星期沖突匹配
L(大寫L) 最后
W(大寫W) 工作日
C(大寫C) 和calendar聯系后計算過的值
#(井號) 星期, 4#2, 第二個星期三

定時任務案例

例如, 我們需要每5秒記錄一下日志

@Service // 1. 注入bean
@EnableScheduling   // 2.開啟定時任務
public class LogServiceImpl {

    //3.添加定時任務
    //秒    分    時     日    月    周幾
    @Scheduled(cron = "0/5 * * * * ?")
    //或直接指定時間間隔,例如:5秒
    //@Scheduled(fixedRate=5000) // 我們也可以使用這種方式(單位: 毫秒)
    private void configureTasks() {
        // 輸入日志
        System.err.println("執行靜態定時任務時間: " + LocalDateTime.now());
    }

}

注意, 定時任務的類必須交給spring管理

擴展: 自定義異步池(異步任務)

為了讓Spring中的異步池更好的服務於我們的業務,同時也盡量避免OOM,可以自定義線程池優化設計如下:關鍵代碼如下:

package com.cy.pj.common.config
@Slf4j
@Setter
@Configuration
@ConfigurationProperties("async-thread-pool")
public class SpringAsyncConfig implements AsyncConfigurer{
    /**核心線程數*/
	private int corePoolSize=20;
	/**最大線程數*/
	private int maximumPoolSize=1000;
	/**線程空閑時間*/
	private int keepAliveTime=30;
	/**阻塞隊列容量*/
	private int queueCapacity=200;
	/**構建線程工廠*/
	private ThreadFactory threadFactory=new ThreadFactory() {
		//CAS算法
		private AtomicInteger at = new AtomicInteger(1000);
		@Override
		public Thread newThread(Runnable r) {
			return new Thread(r, 
"db-async-thread-"+at.getAndIncrement());
		}
	};	
    
	@Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maximumPoolSize);
        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setQueueCapacity(queueCapacity);
        executor.setRejectedExecutionHandler((Runnable r, 
 ThreadPoolExecutor exe) -> {
                log.warn("當前任務線程池隊列已滿.");
        });
        executor.initialize();
        return executor;
    }
 
    @Override
    public AsyncUncaughtExceptionHandler 
        		getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
            @Override
            public void handleUncaughtException(
                	Throwable ex ,
                    Method method , 
                	Object... params) {
                log.error("線程池執行任務發生未知異常.", ex);
            }
        };
    }
}

其中:@ConfigurationProperties("async-thread-pool")的含義是讀取application.yml配置文件中以"async-thread-pool"名為前綴的配置信息,並通過所描述類的set方法賦值給對應的屬性,在application.yml中連接器池的關鍵配置如下:

async-thread-pool:
       corePoolSize: 20
       maxPoolSize: 1000
       keepAliveSeconds: 30
       queueCapacity: 1000

后續在業務類中,假如我們使用@Async注解描述業務方法,默認會使用ThreadPoolTaskExecutor池對象中的線程執行異步任務。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM