今天給大家介紹一款新武器。我自研的一個java組件easyTask-L。這個是做啥的呢?我之前研發了一款單機版本的easyTask,這次是要介紹另外一款easyTask-L。區別就是后者支持分布式環境,任務數據支持多個備份,具備了真正意義上的高可用。同時它又是輕量級的分布式應用,原因是因為它還不是一個獨立的中間件,它需要一個宿主程序才能使用。做成獨立的中間件是我后面要繼續做的一個版本。
組件開源地址:https://github.com/liuche51/easyTask-L
廢話不多說,先來介紹下easyTask-L組件的特性。
高可用:因為我們是分布式leader-follow集群,每個任務多有多個備份數據,如果節點宕機,集群將自動選舉。所以可靠性非常高
秒級觸發:我們是采用時鍾秒級分片的數據結構,支持秒級觸發任務。不早也不遲
分布式:組件支持分布式
高並發:支持多線程同時提交任務,支持多線程同時執行任務
數據一致性:使用TCC事務機制,保障數據在集群中的強一致性
海量任務:節點可以存儲非常多的任務,只要內存和磁盤足夠。觸發效率也是極高。需要配置好分派任務線程池和執行任務線程池大小即可
開源:組件完全在GitHub上開源。任何人都可以隨意使用,在不侵犯著作權情況下
易使用:無需獨立部署集群,嵌入式開發。不過多的依賴於第三方中間件,除了zookeeper。
特別適合以下場景使用:
- 乘坐網約車結單后30分鍾若顧客未評價,則系統將默認提交一條評價信息
- 銀行充值接口,要求5分鍾后才可以查詢到結果成功OR失敗
- 會員登錄系統30秒后自動發送一條登錄短信通知
- 每個登錄用戶每隔10秒統計一次其某活動中獲得的積分
easyTask-L組件的整體架構如下:
整體采用分布式設計,leader-follow風格。集群中每一個節點都是leader,同時也可能是其他某個節點的follow。每個leader都有若干個follow。leader上提交的新任務都會強制同步到follow中,刪除任務同時也會強制刪除follow中的備份任務。集群中所有節點都會在zookeeper中注冊並維持心跳。
為了能更好的可用性,建議集群節點數不少於4個,這樣其中一個節點宕機,就能立即得到補充。否則可能導致集群不可用。
easyTask-L組件的核心“環形隊列”的設計架構如下:
環形隊列在之前單機版的easyTask中也講過,原理都是類似的。客戶端提交任務,服務端先將任務進行持久化,再添加上環形隊列這個數據結構中去,等待時間片輪詢的到來。不同的是這里的持久化機制,改成了分布式存儲了。不僅leader自己存儲起來,還要同步存儲到其follow中去。刪除一個任務也是類似的過程。
任務添加時會計算其觸發所屬的時間分片槽,等環形隊列的始終秒針到達時會判斷任務是否可以被執行了。如果可以執行了,則分派任務線程池將其丟入執行任務線程池等待執行。只要執行任務線程池線程數足夠,任務將立即得到執行。
大概的原理清晰了,接下來就是寫個HelloWorld程序了!
easyTask-L不是一個中間件,所以需要一個宿主程式。建議在微服務框架如:dubbo、spring-cloud中使用此組件,並建立一個獨立的專門用於處理延時任務的服務模塊。這樣可以使服務盡可能少的頻繁更新重啟。保持集群的穩定性。下面我將以一個springboot應用為例來給大家演示如何使用easyTask-L組件。測試環境:JDK1.8、zookeeper3.4.8
第一步:引入jar包
如果你是Maven項目,可以使用如下方式配置引入jar包。這可以讓項目自動引入easyTask-L中依賴的其他第三方jar包。最新版本請在maven中央倉庫中查詢。請在pom.xml中加入以下引用
<dependency> <groupId>com.github.liuche51</groupId> <artifactId>easyTask-L</artifactId> <version>1.0.5</version> </dependency>
第二步:配置啟動環形隊列
這里以springboot應用為例,在application.yml中做如下配置
server: port: 8081 spring: application: name: easyTask-L easyTaskL: zkAddress: 127.0.0.1:2181 taskStorePath: C:/db/node1 serverPort: 2021 sQLlitePoolSize: 5 backupCount: 2 dispatchPool: corePoolSize: 5 maximumPoolSize: 50 workPool: corePoolSize: 5 maximumPoolSize: 50
新建一個啟動配置類EasyTaskLConf.java

1 package com.github.liuche51.easyTaskL.config; 2 3 import com.github.liuche51.easyTask.core.AnnularQueue; 4 import com.github.liuche51.easyTask.core.EasyTaskConfig; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.beans.factory.annotation.Value; 8 import org.springframework.context.annotation.Bean; 9 import org.springframework.context.annotation.Configuration; 10 11 import java.util.concurrent.LinkedBlockingQueue; 12 import java.util.concurrent.ThreadPoolExecutor; 13 14 @Configuration 15 public class EasyTaskLConf { 16 private static Logger log = LoggerFactory.getLogger(EasyTaskLConf.class); 17 @Value("${easyTaskL.zkAddress}") 18 private String zkAddress; 19 @Value("${easyTaskL.taskStorePath}") 20 private String taskStorePath; 21 @Value("${easyTaskL.serverPort}") 22 private int serverPort; 23 @Value("${easyTaskL.sQLlitePoolSize}") 24 private int sQLlitePoolSize; 25 @Value("${easyTaskL.backupCount}") 26 private int backupCount; 27 @Value("${easyTaskL.dispatchPool.corePoolSize}") 28 private int dispatchCorePoolSize; 29 @Value("${easyTaskL.dispatchPool.maximumPoolSize}") 30 private int dispatchMaximumPoolSize; 31 @Value("${easyTaskL.workPool.corePoolSize}") 32 private int workPoolCorePoolSize; 33 @Value("${easyTaskL.workPool.maximumPoolSize}") 34 private int workPoolMaximumPoolSize; 35 @Bean 36 public AnnularQueue initAnnularQueue(){ 37 try { 38 EasyTaskConfig config =new EasyTaskConfig(); 39 config.setTaskStorePath(taskStorePath); 40 config.setServerPort(serverPort); 41 config.setSQLlitePoolSize(sQLlitePoolSize); 42 //config.setBackupCount(backupCount); 43 config.setZkAddress(zkAddress); 44 AnnularQueue annularQueue = AnnularQueue.getInstance(); 45 config.setDispatchs(new ThreadPoolExecutor(dispatchCorePoolSize, dispatchMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS, 46 new LinkedBlockingQueue<Runnable>())); 47 config.setWorkers(new ThreadPoolExecutor(workPoolCorePoolSize, workPoolMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS, 48 new LinkedBlockingQueue<Runnable>())); 49 annularQueue.start(config); 50 return annularQueue; 51 }catch (Exception e){ 52 log.error("",e); 53 return null; 54 } 55 } 56 57 }
第三步:建立延時任務處理類
這個需要根據具體情況,創建你要處理的任務類。任務類都需要繼承Task 這個父類以及實現Runnable 的run接口。這里可以寫你的任務邏輯,getParam()可以獲取到你提交任務時傳入的參數。
package com.github.liuche51.easyTaskL.task; import com.github.liuche51.easyTask.dto.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; public class CusTask1 extends Task implements Runnable { private static Logger log = LoggerFactory.getLogger(CusTask1.class); @Override public void run() { Map<String, String> param = getParam(); if (param != null && param.size() > 0) { log.info("任務1已執行!姓名:{} 生日:{} 年齡:{} 線程ID:{}", param.get("name"), param.get("birthday"), param.get("age"), param.get("threadid")); } } }
第四步:向環形隊列中添加任務
新建一個Controller,增加以下Action方法。
@RequestMapping("/once") @ResponseBody public String once(@RequestParam("name") String name, @RequestParam("time") int time) { CusTask1 task1 = new CusTask1(); task1.setEndTimestamp(ZonedDateTime.now().plusSeconds(time).toInstant().toEpochMilli()); Map<String, String> param = new HashMap<String, String>() { { put("name", name); put("birthday", "1996-1-1"); put("age", "28"); put("threadid", String.valueOf(Thread.currentThread().getId())); } }; task1.setParam(param); return AnnularQueue.getInstance().submitAllowWait(task1); }
完整的demo可以使用Git克隆我的一個開源項目:https://gitee.com/liuche/DubboServer.git 找到子項目easyTask-L-demo即可