compile "com.dangdang:elastic-job-lite-core:2.1.5"
compile "com.dangdang:elastic-job-lite-spring:2.1.5"
public interface SimpleJob extends ElasticJob {
/**
* 執行作業.
*
* @param shardingContext 分片上下文
*/
void execute(ShardingContext shardingContext);
}
注意這里面有一個shardingContext參數,看下源碼:
/**
* 分片上下文.
*
* @author zhangliang
*/
@Getter
@ToString
public final class ShardingContext {
/**
* 作業名稱.
*/
private final String jobName;
/**
* 作業任務ID.
*/
private final String taskId;
/**
* 分片總數.
*/
private final int shardingTotalCount;
/**
* 作業自定義參數.
* 可以配置多個相同的作業, 但是用不同的參數作為不同的調度實例.
*/
private final String jobParameter;
/**
* 分配於本作業實例的分片項.
*/
private final int shardingItem;
/**
* 分配於本作業實例的分片參數.
*/
private final String shardingParameter;
public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
jobName = shardingContexts.getJobName();
taskId = shardingContexts.getTaskId();
shardingTotalCount = shardingContexts.getShardingTotalCount();
jobParameter = shardingContexts.getJobParameter();
this.shardingItem = shardingItem;
shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
}
}
這里面有2個很重要的屬性:shardingTotalCount 分片總數(比如:2)、shardingItem 當前分片索引(比如:1),前面提到的性能擴容,就可以根據2個參數進行簡單的處理,假設在電商系統中,每天晚上有個定時任務,要統計每家店的銷量。商家id一般在表設計上是一個自增數字,如果總共2個分片(注:通常也就是部署2個節點),可以把 id為奇數的放到分片0,id為偶數的放到分片1,這樣2個機器各跑一半,相對只有1台機器而言,就快多了。
偽代碼如下:
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int shardIndx = shardingContext.getShardingItem();
if (shardIndx == 0) {
//處理id為奇數的商家
} else {
//處理id為偶數的商家
}
}
}
這個還可以進一步簡化,如果使用mysql查詢商家列表,mysql中有一個mod函數,直接可以對商家id進行取模運算
select * from shop where mod(shop_id,2)=0
如果把上面的2、0換成參數,mybatis中類似這樣:
select * from shop where mod(shop_id,#{shardTotal})=#{shardIndex}
這樣邏輯就轉換到sql中處理了,java代碼中把參數傳進來就行,連if都可以省掉。
2、接下來看看如何配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd">
<reg:zookeeper id="regCenter" server-lists="${zk_address}" namespace="my-xxx-job"
base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="3000" max-retries="3"/>
<job:simple id="xxxJob" class="com.cnblogs.yjmyzz.XXXJob" registry-center-ref="regCenter"
cron="${xxxJob_cornExpress}"
sharding-total-count="2" sharding-item-parameters="0=A,1=B"/>
...
</beans>
與常規的spring配置幾乎沒啥區別,幾個要點如下:
a) 因為分片調度是基於zk的,所以要先配置zk注冊中心,其中${zk_address}大家可以改成實際的zk地址列表,比如:10.x.x.1:2181,10.x.x.2:2181,10.x.x.3:2181
b) 每個job中的corn屬性,就是quartz中的cornExpress表達式,然后sharding-total-count即總分片數,而sharding-item-parameters則是指定每個分片中的具體參數
(注:剛才的電商每天晚上算銷量,這個case其實只用到了分片索引、分片數,並不需要參數,所以這里隨便配置一個類似0=A, 1=B就可以了,如果有些業務場景,希望在知道分片索引的同時,還希望額外傳一些參數進來,就可以在這里配置你希望的參數,然后在execute中,也能讀到相應的參數)
3、控制台
elastic-job還提供了一個不錯的UI控制台,項目源代碼git clone到本地,mvn install就能得到一個elastic-job-lite-console-${version}.tar.gz的包,解壓,然后運行里面的bin/start.sh 就能跑起來,界面類似如下:

通過這個控制台,可以動態調整每個定時任務的觸發時間(即:cornExpress)。詳情可參考官網文檔-運維平台部分。
4、與spring-cloud/spring-boot的整合
如果是傳統的spring項目,按上面的步驟就可以無縫整合了,如果是spring-cloud/spring-boot,則稍微要復雜點。
由於spring-boot倡導零xml配置,所以大部分配置就用代碼替代了,先定義一個elasticJob的配置類:
@Data
@Configuration
public class ElasticJobConfig {
@Value("${rxQuartz.app.zkAddress}")
private String zkNodes;
@Value("${rxQuartz.app.namespace}")
private String namespace;
@Bean
public ZookeeperConfiguration zkConfig() {
return new ZookeeperConfiguration(zkNodes, namespace);
}
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
return new ZookeeperRegistryCenter(config);
}
}
上面這段代碼,主要是解決zk注冊中心的注入問題,然后各種xxxJob,由於要讓spring自動注入,需要打上component注解
@Component("xxxJob")
public class XXXJob extends AbstractJob {
...
}
然后在真正要用的地方,把他們組裝起來
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: yangjunming
*/
@Configuration
public class ElasticJobs {
@Autowired
@Qualifier("xxxJob")
public SimpleJob xxxJob;
@Autowired
private ZookeeperRegistryCenter regCenter;
@Bean(initMethod = "init")
public JobScheduler settlementJobScheduler(@Autowired @Qualifier("xxxJob") SimpleJob simpleJob,
@Value("${xxxJob.billCronExpress}") final String cron,
@Value("${xxxJob.shardingCount}") int shardingCount,
@Value("${xxxJob.shardingItemParameters}") String shardingItemParameters) {
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingCount, shardingItemParameters));
}
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
}
}
大功告成,祝大家周末擼碼愉快!
