分布式任務框架elastic-job 學習筆記


官方資料:https://github.com/dangdangdotcom/elastic-job

-------------------------------------------------------------------------------------

官方資料非常完整而且思路清晰,按照自己學習過程整理如下:

1、   何為分布式任務?

  自己理解,就是一件事情讓多台機器來完成。單機環境下,所有任務都是單個電腦獨立完成,分布式任務就是把任務按一定邏輯進行切分(也就是所謂的分片),分成幾個小的片段,然后分給不同的電腦,每台電腦執行其中的幾個片段。

分片概念:

  任務的分布式執行,需要將一個任務拆分為n個獨立的任務項,然后由分布式的服務器分別執行某一個或幾個分片項。

  例如:有一個遍歷數據庫某張表的作業,現有2台服務器。為了快速的執行作業,那么每台服務器應執行作業的50%。 為滿足此需求,可將作業分成2片,每台服務器執行1片。作業遍歷數據的邏輯應為:服務器A遍歷ID以奇數結尾的數據;服務器B遍歷ID以偶數結尾的數據。 如果分成10片,則作業遍歷數據的邏輯應為:每片分到的分片項應為ID%10,而服務器A被分配到分片項0,1,2,3,4;服務器B被分配到分片項5,6,7,8,9,直接的結果就是服務器A遍歷ID以0-4結尾的數據;服務器B遍歷ID以5-9結尾的數據。

2、   github源碼

  從github下載下來,maven導入eclipse后,分為5部分:

  

  官方目錄結構說明:

  elastic-job-core  //核心模塊,只通過Quartz和Curator就可執行分布式作業。

  elastic-job-spring  //對spring支持的模塊,包括命名空間,依賴注入,占位符等。

  elastic-job-console  // web控制台,可將編譯之后的war放入tomcat等servlet容器中使用。

  elastic-job-example  //使用示例。

  elastic-job-doc  //使用markdown生成文檔的項目,使用方無需關注。

  需要說明一下,官方也提到了,需要一個lombok.jar。個人感覺這個確實很不錯,有了這個jar包,可以省掉get set方法,在屬性很多的時候特別方便。

  *************************************************

  lombok 的官方網址:http://projectlombok.org/  

    lombok 注解在線幫助文檔:http://projectlombok.org/features/index.
      下面介紹幾個我常用的 lombok 注解:
        @Data   :注解在類上;提供類所有屬性的 getting 和 setting 方法,此外還提供了equals、canEqual、hashCode、toString 方法
        @Setter:注解在屬性上;為屬性提供 setting 方法
        @Getter:注解在屬性上;為屬性提供 getting 方法
        @Log4j :注解在類上;為類提供一個 屬性名為log 的 log4j 日志對象
        @NoArgsConstructor:注解在類上;為類提供一個無參的構造方法
        @AllArgsConstructor:注解在類上;為類提供一個全參的構造方法

  *************************************************

3、   快速上手部署應用(單機跟集群)

  快速上手可以參照官方文檔:   http://dangdangdotcom.github.io/elastic-job/post/quick_start/

  這是個單機環境的例子。為加深理解,自己部署集群環境,步驟如下:

  a、啟動zookeeper(測試用,可單機可集群),步驟參見官方快速上手文檔

  b、修改官方example代碼如下:

       1)、在com.dangdang.example.elasticjob.spring包下新建myjob包,創建新類MySimpleJobTest.java,代碼如下:    

@Component
public class MySimpleJobTest extends AbstractSimpleElasticJob {

    private PrintContext printContext = new PrintContext(SimpleJobDemo.class);
    
    @Resource
    private FooRepository fooRepository;
    
    private static AtomicInteger count = new AtomicInteger(0);
	
	@Override
	public void process(JobExecutionMultipleShardingContext shardingContext) {
	   System.out.println("第"+count.addAndGet(1)+"次執行,當前分片號為:"+shardingContext.getShardingItemParameters());
	}

}

  2)、修改resources/META-INF/withNamespace.xml為:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" 
    xmlns:job="http://www.dangdang.com/schema/ddframe/job" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans.xsd 
                        http://www.springframework.org/schema/context 
                        http://www.springframework.org/schema/context/spring-context.xsd 
                        http://www.dangdang.com/schema/ddframe/reg 
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd 
                        http://www.dangdang.com/schema/ddframe/job 
                        http://www.dangdang.com/schema/ddframe/job/job.xsd 
                        ">
    <context:component-scan base-package="com.dangdang.example.elasticjob" />
    <context:property-placeholder location="classpath:conf/*.properties" />
    
    <reg:zookeeper id="regCenter" serverLists="${serverLists}" namespace="${namespace}" baseSleepTimeMilliseconds="${baseSleepTimeMilliseconds}" maxSleepTimeMilliseconds="${maxSleepTimeMilliseconds}" maxRetries="${maxRetries}" nestedPort="${nestedPort}" nestedDataDir="${nestedDataDir}" />
    
<!--     <job:bean id="simpleElasticJob" class="com.dangdang.example.elasticjob.spring.job.SimpleJobDemo" regCenter="regCenter" shardingTotalCount="${simpleJob.shardingTotalCount}" cron="${simpleJob.cron}" shardingItemParameters="${simpleJob.shardingItemParameters}" monitorExecution="${simpleJob.monitorExecution}" monitorPort="${simpleJob.monitorPort}" failover="${simpleJob.failover}" description="${simpleJob.description}" disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}" /> -->
<!--     <job:bean id="throughputDataFlowJob" class="com.dangdang.example.elasticjob.spring.job.ThroughputDataFlowJobDemo" regCenter="regCenter" shardingTotalCount="${throughputDataFlowJob.shardingTotalCount}" cron="${throughputDataFlowJob.cron}" shardingItemParameters="${throughputDataFlowJob.shardingItemParameters}" monitorExecution="${throughputDataFlowJob.monitorExecution}" failover="${throughputDataFlowJob.failover}" processCountIntervalSeconds="${throughputDataFlowJob.processCountIntervalSeconds}" concurrentDataProcessThreadCount="${throughputDataFlowJob.concurrentDataProcessThreadCount}" description="${throughputDataFlowJob.description}" disabled="${throughputDataFlowJob.disabled}" overwrite="${throughputDataFlowJob.overwrite}" /> -->
<!--     <job:bean id="sequenceDataFlowJob3" class="com.dangdang.example.elasticjob.spring.job.SequenceDataFlowJobDemo" regCenter="regCenter" shardingTotalCount="${sequenceDataFlowJob.shardingTotalCount}" cron="${sequenceDataFlowJob.cron}" shardingItemParameters="${sequenceDataFlowJob.shardingItemParameters}" monitorExecution="${sequenceDataFlowJob.monitorExecution}" failover="${sequenceDataFlowJob.failover}" processCountIntervalSeconds="${sequenceDataFlowJob.processCountIntervalSeconds}" maxTimeDiffSeconds="${sequenceDataFlowJob.maxTimeDiffSeconds}" description="${sequenceDataFlowJob.description}" disabled="${sequenceDataFlowJob.disabled}" overwrite="${sequenceDataFlowJob.overwrite}" /> -->
	
	<job:bean id="simpleElasticJob2" class="com.dangdang.example.elasticjob.spring.myjob.MySimpleJobTest" regCenter="regCenter" shardingTotalCount="${simpleJob.shardingTotalCount}" cron="${simpleJob.cron}" shardingItemParameters="${simpleJob.shardingItemParameters}" monitorExecution="${simpleJob.monitorExecution}" monitorPort="${simpleJob.monitorPort}" failover="${simpleJob.failover}" description="${simpleJob.description}" disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}" />

</beans>

  

  c、建虛擬機(多台電腦的用另一電腦即可),配置環境變量。

      此處本地采用ubuntu16.04的64位版本.

     需配置的有:jdk,maven,為了讓maven能在本地找到jar包,而不再浪費時間去網絡maven庫下載,可以線運行mvn install生成.m2目錄(該目錄隱藏,本地虛擬機是位於/home下),將win下的.m2/repository文件夾拷貝到虛擬機的.m2下.

  d、將elastic-job-example拷貝到虛擬機,本地為/usr/mytest目錄

     修改虛擬機中example項目的配置文件/resources/conf/reg.properties

     serverLists為zookeeper服務器地址

     nestedPort設置為-1,不啟動自帶zookeeper(兩台電腦都不啟用默認zookeeper)

  e、虛擬機切換到elastic-job-example目錄(該目錄下有pom文件)

      運行:mvn compile ,運行完畢后

    運行:mvn exec:java -Dexec.mainClass="com.dangdang.example.elasticjob.spring.main"

  f、切回主機,com.dangdang.example.elasticjob.spring.main運行該文件的main方法

   可明顯看到:虛擬機開始單機運行時,處理分片為0-9,在主機開始運行后,變為5-9,兩者確實進行了任務分配:

  

  g、部署tomcat監控

      虛擬機部署tomcat,將elastic-job-console在主機打war包,然后放入tomcat的webapp下,啟動tomcat,訪問http://ip:端口號/elastic-job-console,賬號密碼:root/root

    填寫zookeeper地址,作業名稱等,可以看到控制頁面:

    

    我們剛剛部署的測試環境:

      

  小結:

    單純開發使用的話,方式之一是:將elastic-job-core跟elastic-job-spring打jar包,然后按照官方的開發指南,重寫相關方法即可。部署的話應該是按照上方集群部署的方式進行的。

    至於具體分片怎么分(官方提供了幾種方式,直接配置屬性),具體內部調度原理,開發過程中具體應用等細節仍待思考。

      補充:

           ej的使用,方式之一是如上所說,jar引入,重寫方法然后集群部署;方式之二是單獨寫一個調度項目,在此處進行“調度”,將分片信息以參數形式傳遞給遠程方法接口,從而實現了將一個大的任務分割給了多個不同機器(這里邊很可能由於遠程也是分布式,

      從而可能導致某機器多次接收之類,可能會並不那么均衡),從而減輕了單機壓力。具體分片邏輯跟接口邏輯根據具體業務場景的不同而不同。東西是死的,具體怎么個用法,正如當當網張亮所言:怎么用都可以。

           基於方式二的使用方式,即使不用ej框架,單純的一個項目C用於定時請求某個遠程接口,只要該遠程接口是集群部署的,那么負載就會分發到不同的機器,從而導致某種程度上實現了多機器執行,雖然這只是分布式帶來的福利而已。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM