elastic-job的原理簡介和使用


轉載:http://blog.csdn.net/fanfan_v5/article/details/61310045

 

elastic-job是當當開源的一款非常好用的作業框架,在這之前,我們開發定時任務一般都是使用quartz或者spring-task(ScheduledExecutorService),無論是使用quartz還是spring-task,我們都會至少遇到兩個痛點:
1.不敢輕易跟着應用服務多節點部署,可能會重復多次執行而引發系統邏輯的錯誤。
2.quartz的集群僅僅只是用來HA,節點數量的增加並不能給我們的每次執行效率帶來提升,即不能實現水平擴展。

本篇博文將會自頂向下地介紹elastic-job,讓大家認識了解並且快速搭建起環境。

 

elastic-job產品線說明

 

elastic-job在2.x之后,出了兩個產品線:Elastic-Job-Lite和Elastic-Job-Cloud。我們一般使用Elastic-Job-Lite就能夠滿足需求,本文也是以Elastic-Job-Lite為主。1.x系列對應的就只有Elastic-Job-Lite,並且在2.x里修改了一些核心類名,差別雖大,原理類似,建議使用2.x系列。寫此博文,最新release版本為2.0.5。

 

elastic-job-lite原理

 
舉個典型的job場景,比如余額寶里的昨日收益,系統需要job在每天某個時間點開始,給所有余額寶用戶計算收益。如果用戶數量不多,我們可以輕易使用quartz來完成,我們讓計息job在某個時間點開始執行,循環遍歷所有用戶計算利息,這沒問題。可是,如果用戶體量特別大,我們可能會面臨着在第二天之前處理不完這么多用戶。另外,我們部署job的時候也得注意,我們可能會把job直接放在我們的webapp里,webapp通常是多節點部署的,這樣,我們的job也就是多節點,多個job同時執行,很容易造成重復執行,比如用戶重復計息,為了避免這種情況,我們可能會對job的執行加鎖,保證始終只有一個節點能執行,或者干脆讓job從webapp里剝離出來,獨自部署一個節點。
elastic-job就可以幫助我們解決上面的問題,elastic底層的任務調度還是使用的quartz,通過zookeeper來動態給job節點分片。
我們來看:
很大體量的用戶需要在特定的時間段內計息完成
我們肯定是希望我們的任務可以通過集群達到水平擴展,集群里的每個節點都處理部分用戶,不管用戶數量有多龐大,我們只要增加機器就可以了,比如單台機器特定時間能處理n個用戶,2台機器處理2n個用戶,3台3n,4台4n...,再多的用戶也不怕了。
使用elastic-job開發的作業都是zookeeper的客戶端,比如我希望3台機器跑job,我們將任務分成3片,框架通過zk的協調,最終會讓3台機器分別分配到0,1,2的任務片,比如server0-->0,server1-->1,server2-->2,當server0執行時,可以只查詢id%3==0的用戶,server1執行時,只查詢id%3==1的用戶,server2執行時,只查詢id%3==2的用戶。
任務部署多節點引發重復執行
在上面的基礎上,我們再增加server3,此時,server3分不到任務分片,因為只有3片,已經分完了。沒有分到任務分片的作業程序將不執行。
如果此時server2掛了,那么server2的分片項會分配給server3,server3有了分片,就會替代server2執行。
如果此時server3也掛了,只剩下server0和server1了,框架也會自動把server3的分片隨機分配給server0或者server1,可能會這樣,server0-->0,server1-->1,2。
這種特性稱之為彈性擴容,即elastic-job名稱的由來。
 

代碼演示

 
我們搭建環境通過示例代碼來演示上面的例子,elastic-job是不支持單機多實例的,通過zk的協調分片是以ip為單元的。很多同學上來可能就是通過單機多實例來學習,結果導致分片和預期不一致。這里沒辦法,只能通過多機器或者虛擬機,我們這里使用虛擬機,另外,由於資源有限,我們這里僅僅只模擬兩台機器。
 
節點說明:
本地宿主機器
zookeeper、job
192.168.241.1

虛擬機
job
192.168.241.128

環境說明:
Java
請使用JDK1.7及其以上版本。
Zookeeper
請使用Zookeeper3.4.6及其以上版本
Elastic-Job-Lite
2.0.5(2.x系列即可,最好是2.0.4及其以上,因為2.0.4版本有本人提交的少許代碼,(*^__^*) 嘻嘻……)

需求說明:
通過兩台機器演示動態分片
 
step1. 引入框架的jar包
[html]  view plain  copy
 
  1. <!-- 引入elastic-job-lite核心模塊 -->  
  2. <dependency>  
  3.     <groupId>com.dangdang</groupId>  
  4.     <artifactId>elastic-job-lite-core</artifactId>  
  5.     <version>2.0.5</version>  
  6. </dependency>  
  7. <!-- 使用springframework自定義命名空間時引入 -->  
  8. <dependency>  
  9.     <groupId>com.dangdang</groupId>  
  10.     <artifactId>elastic-job-lite-spring</artifactId>  
  11.     <version>2.0.5</version>  
  12. </dependency>  
step2. 編寫job
[java]  view plain  copy
 
  1. package com.fanfan.sample001;  
  2.   
  3. import com.dangdang.ddframe.job.api.ShardingContext;  
  4. import com.dangdang.ddframe.job.api.simple.SimpleJob;  
  5.   
  6. import java.util.Date;  
  7.   
  8. /** 
  9.  * Created by fanfan on 2016/12/20. 
  10.  */  
  11. public class MySimpleJob implements SimpleJob {  
  12.     @Override  
  13.     public void execute(ShardingContext shardingContext) {  
  14.         System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, 當前分片項: %s",  
  15.                 Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));  
  16.         /** 
  17.          * 實際開發中,有了任務總片數和當前分片項,就可以對任務進行分片執行了 
  18.          * 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem 
  19.          */  
  20.     }  
  21. }  
Step3. Spring配置
[html]  view plain  copy
 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.        xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"  
  5.        xmlns:job="http://www.dangdang.com/schema/ddframe/job"  
  6.        xsi:schemaLocation="http://www.springframework.org/schema/beans  
  7.                         http://www.springframework.org/schema/beans/spring-beans.xsd  
  8.                         http://www.dangdang.com/schema/ddframe/reg  
  9.                         http://www.dangdang.com/schema/ddframe/reg/reg.xsd  
  10.                         http://www.dangdang.com/schema/ddframe/job  
  11.                         http://www.dangdang.com/schema/ddframe/job/job.xsd">  
  12.     <!--配置作業注冊中心 -->  
  13.     <reg:zookeeper id="regCenter" server-lists="192.168.241.1:2181" namespace="dd-job"  
  14.                    base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />  
  15.   
  16.     <!-- 配置作業-->  
  17.     <job:simple id="mySimpleJob" class="com.fanfan.sample001.MySimpleJob" registry-center-ref="regCenter"  
  18.                 sharding-total-count="2" cron="0/2 * * * * ?" overwrite="true" />  
  19.   
  20. </beans>  
 
Case1. 單節點


 
 
 
 
Case2. 增加一個節點

 
 
 
 
 
 
 
Case3. 斷開一個節點
 
 
 
 
 

作業類型

 
elastic-job提供了三種類型的作業:Simple類型作業、Dataflow類型作業、Script類型作業。這里主要講解前兩者。Script類型作業意為腳本類型作業,支持shell,python,perl等所有類型腳本,使用不多,可以參見github文檔。

SimpleJob需要實現SimpleJob接口,意為簡單實現,未經過任何封裝,與quartz原生接口相似,比如示例代碼中所使用的job。

Dataflow類型用於處理數據流,需實現DataflowJob接口。該接口提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)數據。
可通過DataflowJobConfiguration配置是否流式處理。
流式處理數據只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直運行下去; 非流式處理數據則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。
實際開發中,Dataflow類型的job還是很有好用的。
 
比如拿余額寶計息來說:
 
[java]  view plain  copy
 
  1. package com.fanfan.sample001;  
  2.   
  3. import com.dangdang.ddframe.job.api.ShardingContext;  
  4. import com.dangdang.ddframe.job.api.dataflow.DataflowJob;  
  5.   
  6. import java.util.ArrayList;  
  7. import java.util.List;  
  8.   
  9. /** 
  10.  * Created by fanfan on 2016/12/23. 
  11.  */  
  12. public class MyDataFlowJob implements DataflowJob<User> {  
  13.   
  14.     /* 
  15.         status 
  16.         0:待處理 
  17.         1:已處理 
  18.      */  
  19.   
  20.     @Override  
  21.     public List<User> fetchData(ShardingContext shardingContext) {  
  22.         List<User> users = null;  
  23.         /** 
  24.          * users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30 
  25.          */  
  26.         return users;  
  27.     }  
  28.   
  29.     @Override  
  30.     public void processData(ShardingContext shardingContext, List<User> data) {  
  31.         for (User user: data) {  
  32.             System.out.println(String.format("用戶 %s 開始計息", user.getUserId()));  
  33.             user.setStatus(1);  
  34.             /** 
  35.              * update user 
  36.              */  
  37.         }  
  38.     }  
  39. }  
 
[html]  view plain  copy
 
  1. <job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter"  
  2.               sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" />  

其它功能

上述介紹的是最精簡常用的功能。elastic-job的功能集還不止這些,比如像作業事件追蹤、任務監聽等,另外,elastic-job-lite-console作為一個獨立的運維平台還提供了用來查詢和操作任務的web頁面。
這些增強的功能讀者可以在github/elastic-job上自行學習,相信有了本篇博文的基礎,再閱讀那些文檔就特別簡單了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM