elastic-job動態添加定時任務


在elastic-job的使用過程中,我們會遇到動態添加定時任務的時候,但是官網上面並沒有對這塊內容進行說明。按照我的理解以及官網上面elastic-job的框架圖,ej的定時任務其實是存儲在zookeeper的一個個節點上面,所以通過給zookeeper添加對應的節點即可完成定時任務的添加動作。

下面上代碼:

import java.text.SimpleDateFormat;
import java.util.Date;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.exception.JobSystemException;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class DynamicAddJob implements SimpleJob{
	private static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";

	/***
	 * @param date 時間
	 * @return cron類型的日期
	 */
	public static String getCron(final Date date) {
		SimpleDateFormat sdf = new SimpleDateFormat(CRON_DATE_FORMAT);
		String formatTimeStr = "";
		if (date != null) {
			formatTimeStr = sdf.format(date);
		}
		return formatTimeStr;
	}


	public static void main(String[] args){
		ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-job.xml");
		ZookeeperRegistryCenter zookeeperRegistryCenter = context.getBean(ZookeeperRegistryCenter.class);
		long now = System.currentTimeMillis();
		for (int i = 0; i < 100; i++) {
			String cron = getCron(new Date(now + (i + 1) * 50000));
			JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("dynamicDemoJob-" + i, cron, 2).build();
			SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, DynamicAddJob.class.getCanonicalName());
			JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build());
			try {
				jobScheduler.init();
			}catch (JobSystemException e){
				e.printStackTrace();
			}
		}
	}

	@Override
	public void execute(ShardingContext shardingContext) {
		switch (shardingContext.getShardingItem()){
			case 0:
				System.out.println("doing sharding 0...job name is "+shardingContext.getJobName());
				// do something by sharding item 0
				break;
			case 1:
				System.out.println("doing sharding 1...job name is "+shardingContext.getJobName());
				// do something by sharding item 1
				break;
		}
	}
}

這里用到比較重要的一個類是JobScheduler,這是lite-core里面一個比較核心的類,這個類其實就是我們的job,他的構造方法包含以下參數:

  • CoordinatorRegistryCenter regCenter:注冊中心,這里是zookeeper
  • LiteJobConfiguration liteJobConfig:定時任務的配置信息

這里可以看一下LiteJobConfiguration這個類,采用了設計模式中的建造者模式進行構建。可能看着會比較摸不着頭腦,里面的Builder跟平時的不太一樣,這里我們需要知道的是ej的源碼采用了lombok這個代碼簡化的工具,只需要通過注解的形式就能將我們平時所需要的get/set和構造器的內容在編譯時創建出來,不需要在代碼中體現,能夠大大簡化我們的代碼。

另外還遇到一個坑。這段代碼不能重復使用,第一次跑的時候沒問題,過段時間再次跑這個代碼時,會在init()處報錯,原因是我們新建的job根本不能被fire,我跟了進去。發現,job的cron表達式表示的時間還是以前的時間,這就奇怪了,明明我這邊配置了一個新的時間。通過debug,進入init方法中,發現他會更新job信息,而更新時,會去zk上面load配置信息,而zk的znode節點是老的節點,上面存儲的配置信息也是老的,所以這塊的cron表達式也是舊的時間,根本不會被執行,下面貼出源碼,供大家參考。

init()源碼:

    /**
     * 初始化作業.
     */
    public void init() {
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }

updateJobConfiguration()的源碼如下:

    /**
     * 更新作業配置.
     *
     * @param liteJobConfig 作業配置
     * @return 更新后的作業配置
     */
    public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
        configService.persist(liteJobConfig);
        return configService.load(false);
    }

load()源碼如下:

    /**
     * 讀取作業配置.
     * 
     * @param fromCache 是否從緩存中讀取
     * @return 作業配置
     */
    public LiteJobConfiguration load(final boolean fromCache) {
        String result;
        if (fromCache) {
            result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
            if (null == result) {
                result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
            }
        } else {
            result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
        }
        return LiteJobConfigurationGsonFactory.fromJson(result);
    }

可以發現這塊load有兩種,一種是從緩存(這里的緩存使用Map來實現的TreeCache)中獲取getJobNodeData,一種是從注冊中心也就是zookeeper中獲取getJobNodeDataDirectly。load的時候,根據的是zk的路徑,其實也就是任務的jobName,所以我們要盡量避免任務名稱的重復。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM