輕量級分布式延時任務處理組件easyTask-L-入門篇


  今天給大家介紹一款新武器。我自研的一個java組件easyTask-L。這個是做啥的呢?我之前研發了一款單機版本的easyTask,這次是要介紹另外一款easyTask-L。區別就是后者支持分布式環境,任務數據支持多個備份,具備了真正意義上的高可用。同時它又是輕量級的分布式應用,原因是因為它還不是一個獨立的中間件,它需要一個宿主程序才能使用。做成獨立的中間件是我后面要繼續做的一個版本。

  組件開源地址:https://github.com/liuche51/easyTask-L

  廢話不多說,先來介紹下easyTask-L組件的特性。

                 

  高可用:因為我們是分布式leader-follow集群,每個任務多有多個備份數據,如果節點宕機,集群將自動選舉。所以可靠性非常高

  秒級觸發:我們是采用時鍾秒級分片的數據結構,支持秒級觸發任務。不早也不遲

  分布式:組件支持分布式

  高並發:支持多線程同時提交任務,支持多線程同時執行任務

  數據一致性:使用TCC事務機制,保障數據在集群中的強一致性

  海量任務:節點可以存儲非常多的任務,只要內存和磁盤足夠。觸發效率也是極高。需要配置好分派任務線程池和執行任務線程池大小即可

  開源:組件完全在GitHub上開源。任何人都可以隨意使用,在不侵犯著作權情況下

  易使用:無需獨立部署集群,嵌入式開發。不過多的依賴於第三方中間件,除了zookeeper。

特別適合以下場景使用:

  • 乘坐網約車結單后30分鍾若顧客未評價,則系統將默認提交一條評價信息
  • 銀行充值接口,要求5分鍾后才可以查詢到結果成功OR失敗
  • 會員登錄系統30秒后自動發送一條登錄短信通知
  • 每個登錄用戶每隔10秒統計一次其某活動中獲得的積分

easyTask-L組件的整體架構如下:

  整體采用分布式設計,leader-follow風格。集群中每一個節點都是leader,同時也可能是其他某個節點的follow。每個leader都有若干個follow。leader上提交的新任務都會強制同步到follow中,刪除任務同時也會強制刪除follow中的備份任務。集群中所有節點都會在zookeeper中注冊並維持心跳。

  為了能更好的可用性,建議集群節點數不少於4個,這樣其中一個節點宕機,就能立即得到補充。否則可能導致集群不可用。

easyTask-L組件的核心“環形隊列”的設計架構如下:

  環形隊列在之前單機版的easyTask中也講過,原理都是類似的。客戶端提交任務,服務端先將任務進行持久化,再添加上環形隊列這個數據結構中去,等待時間片輪詢的到來。不同的是這里的持久化機制,改成了分布式存儲了。不僅leader自己存儲起來,還要同步存儲到其follow中去。刪除一個任務也是類似的過程。

  任務添加時會計算其觸發所屬的時間分片槽,等環形隊列的始終秒針到達時會判斷任務是否可以被執行了。如果可以執行了,則分派任務線程池將其丟入執行任務線程池等待執行。只要執行任務線程池線程數足夠,任務將立即得到執行。

   大概的原理清晰了,接下來就是寫個HelloWorld程序了!

  easyTask-L不是一個中間件,所以需要一個宿主程式。建議在微服務框架如:dubbo、spring-cloud中使用此組件,並建立一個獨立的專門用於處理延時任務的服務模塊。這樣可以使服務盡可能少的頻繁更新重啟。保持集群的穩定性。下面我將以一個springboot應用為例來給大家演示如何使用easyTask-L組件。測試環境:JDK1.8、zookeeper3.4.8

  第一步:引入jar包

 

   如果你是Maven項目,可以使用如下方式配置引入jar包。這可以讓項目自動引入easyTask-L中依賴的其他第三方jar包。最新版本請在maven中央倉庫中查詢。請在pom.xml中加入以下引用

 <dependency>
       <groupId>com.github.liuche51</groupId>
       <artifactId>easyTask-L</artifactId>
       <version>1.0.5</version>
 </dependency>

  第二步:配置啟動環形隊列

  這里以springboot應用為例,在application.yml中做如下配置

server:
   port: 8081
spring:
   application:
      name: easyTask-L
easyTaskL:
   zkAddress: 127.0.0.1:2181
   taskStorePath: C:/db/node1
   serverPort: 2021
   sQLlitePoolSize: 5
   backupCount: 2
   dispatchPool:
      corePoolSize: 5
      maximumPoolSize: 50
   workPool:
      corePoolSize: 5
      maximumPoolSize: 50

  新建一個啟動配置類EasyTaskLConf.java

 1 package com.github.liuche51.easyTaskL.config;
 2 
 3 import com.github.liuche51.easyTask.core.AnnularQueue;
 4 import com.github.liuche51.easyTask.core.EasyTaskConfig;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 import org.springframework.beans.factory.annotation.Value;
 8 import org.springframework.context.annotation.Bean;
 9 import org.springframework.context.annotation.Configuration;
10 
11 import java.util.concurrent.LinkedBlockingQueue;
12 import java.util.concurrent.ThreadPoolExecutor;
13 
14 @Configuration
15 public class EasyTaskLConf {
16     private static Logger log = LoggerFactory.getLogger(EasyTaskLConf.class);
17     @Value("${easyTaskL.zkAddress}")
18     private String zkAddress;
19     @Value("${easyTaskL.taskStorePath}")
20     private String taskStorePath;
21     @Value("${easyTaskL.serverPort}")
22     private int serverPort;
23     @Value("${easyTaskL.sQLlitePoolSize}")
24     private int sQLlitePoolSize;
25     @Value("${easyTaskL.backupCount}")
26     private int backupCount;
27     @Value("${easyTaskL.dispatchPool.corePoolSize}")
28     private int dispatchCorePoolSize;
29     @Value("${easyTaskL.dispatchPool.maximumPoolSize}")
30     private int dispatchMaximumPoolSize;
31     @Value("${easyTaskL.workPool.corePoolSize}")
32     private int workPoolCorePoolSize;
33     @Value("${easyTaskL.workPool.maximumPoolSize}")
34     private int workPoolMaximumPoolSize;
35     @Bean
36     public AnnularQueue initAnnularQueue(){
37         try {
38             EasyTaskConfig config =new EasyTaskConfig();
39             config.setTaskStorePath(taskStorePath);
40             config.setServerPort(serverPort);
41             config.setSQLlitePoolSize(sQLlitePoolSize);
42             //config.setBackupCount(backupCount);
43             config.setZkAddress(zkAddress);
44             AnnularQueue annularQueue = AnnularQueue.getInstance();
45             config.setDispatchs(new ThreadPoolExecutor(dispatchCorePoolSize, dispatchMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS,
46                     new LinkedBlockingQueue<Runnable>()));
47             config.setWorkers(new ThreadPoolExecutor(workPoolCorePoolSize, workPoolMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS,
48                     new LinkedBlockingQueue<Runnable>()));
49             annularQueue.start(config);
50             return annularQueue;
51         }catch (Exception e){
52             log.error("",e);
53              return null;
54         }
55     }
56 
57 }
EasyTaskLConf.java

  第三步:建立延時任務處理類

  這個需要根據具體情況,創建你要處理的任務類。任務類都需要繼承Task 這個父類以及實現Runnable 的run接口。這里可以寫你的任務邏輯,getParam()可以獲取到你提交任務時傳入的參數。

package com.github.liuche51.easyTaskL.task;
import com.github.liuche51.easyTask.dto.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;

public class CusTask1 extends Task implements Runnable {
    private static Logger log = LoggerFactory.getLogger(CusTask1.class);
    @Override
    public void run() {
        Map<String, String> param = getParam();
        if (param != null && param.size() > 0) {
            log.info("任務1已執行!姓名:{} 生日:{} 年齡:{} 線程ID:{}", param.get("name"), param.get("birthday"), param.get("age"), param.get("threadid"));
        }
    }
}

  第四步:向環形隊列中添加任務

  新建一個Controller,增加以下Action方法。

@RequestMapping("/once")
@ResponseBody
public String once(@RequestParam("name") String name, @RequestParam("time") int time) {
	CusTask1 task1 = new CusTask1();
	task1.setEndTimestamp(ZonedDateTime.now().plusSeconds(time).toInstant().toEpochMilli());
	Map<String, String> param = new HashMap<String, String>() {
		{
			put("name", name);
			put("birthday", "1996-1-1");
			put("age", "28");
			put("threadid", String.valueOf(Thread.currentThread().getId()));
		}
	};
	task1.setParam(param);
	return AnnularQueue.getInstance().submitAllowWait(task1);
}

  完整的demo可以使用Git克隆我的一個開源項目:https://gitee.com/liuche/DubboServer.git  找到子項目easyTask-L-demo即可


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM