一:問題的引出與復現
在一個風和日麗的工作日,公司運營發現系統的任務數據沒有推送執行,整個流程因此停住了。我立馬遠程登陸服務器,查看日志,好家伙,系統在瘋狂的打印相同的一段日志:c.d.d.j.i.e.LeaderElectionService [traceId=] - Elastic job: leader node is electing, waiting for 100 ms at server '192.168.0.6'

第一反應就是基建出問題了,無奈和運營商量,准備重啟項目服務,重啟后,問題立刻解決,業務也正常運行了。
有句話說得好,你覺得可能再次出現的問題,一定會再次出現。忘記了多少天后(開發初期,業務很緊張,這個問題沒有時間及時去處理),又有別的定時任務也不執行了,出現的問題也是一模一樣。
同一個問題在生產出現了兩次,已經必須要去解決了,首先去網上搜索下,也有網友遇到過這種問題,但是下面的回復卻是說:”Elastic job正在選舉主節點,等它選完就正常了。“
先說下當時生產正在用的就是 com.dangdang.elastic-job-core,是當當網開源的一個分布式調度的組件,在上家公司三個機器節點做的集群用了很長時間也重來沒有遇到這個問題,
當時就納悶了,難道是有什么配置設置的不對,導致它無法正常選主嗎?
然后花了點時間,自己搭了一個項目,准備去仔細分析debug下它的源碼,在這兒就發現,每次遠程debug的時候,一兩分鍾后,項目日志就會復現 c.d.d.j.i.e.LeaderElectionService [traceId=] - Elastic job: leader node is electing, waiting for 100 ms at server '192.168.0.6' 。
因此大膽猜測,因為debug導致Elastic job和注冊中心心跳鏈接超時了,而生產環境的系統也可能因為網絡抖動或者IO的壓力,導致這個問題。
二:ElasticJob簡單使用
2020年6月,經過Apache ShardingSphere社區投票,接納ElasticJob為其子項目。目前ElasticJob的四個子項目已經正式遷入Apache倉庫。
http://shardingsphere.apache.org/elasticjob/index_zh.html 最新的3.x版本在開源社區的幫助下,相比之前已經有了很大的優化,當然經過測試,也完美解決了選主的問題。
大致翻閱一下官方文檔,下面就准備接入測試下。
引入maven依賴
<dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-lite-core</artifactId> <version>${latest.release.version}</version> </dependency>
# Spring 命名空間,可以與 Spring 容器配合使用
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-spring-namespace</artifactId>
<version>3.0.0-beta</version>
</dependency>
# zk的版本要求3.6.0 以上
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.2</version>
</dependency>
elasticjob.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:elasticjob="http://shardingsphere.apache.org/schema/elasticjob" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://shardingsphere.apache.org/schema/elasticjob http://shardingsphere.apache.org/schema/elasticjob/elasticjob.xsd "> <elasticjob:zookeeper id="regCenter2" server-lists="${zkHost}" namespace="${elastic.job.namespace}" base-sleep-time-milliseconds="${elastic.job.baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${elastic.job.maxSleepTimeMilliseconds}" max-retries="${elastic.job.maxRetries}"/>
<elasticjob:job id="apacheTestJob" job-ref="apacheTestJob" registry-center-ref="regCenter2" sharding-total-count="${apacheTestJob.shardingTotalCount}" cron="${apacheTestJob.cron}" failover="${apacheTestJob.failover}" description="${apacheTestJob.description}" disabled="${apacheTestJob.disabled}" overwrite="${apacheTestJob.overwrite}" job-executor-service-handler-type="SINGLE_THREAD"/> <bean id="apacheTestJob" class="com.yxy.nova.elastic.job.ApacheTestJob" /> </beans>
可配置屬性:
| 屬性名 | 是否必填 |
|---|---|
| id | 是 |
| class | 否 |
| job-ref | 否 |
| registry-center-ref | 是 |
| tracing-ref | 否 |
| cron | 是 |
| sharding-total-count | 是 |
| sharding-item-parameters | 否 |
| job-parameter | 否 |
| monitor-execution | 否 |
| failover | 否 |
| misfire | 否 |
| max-time-diff-seconds | 否 |
| reconcile-interval-minutes | 否 |
| job-sharding-strategy-type | 否 |
| job-executor-service-handler-type | 否 |
| job-error-handler-type | 否 |
| job-listener-types | 否 |
| description | 否 |
| props | 否 |
| disabled | 否 |
| overwrite | 否 |
1:cron 定時執行的表達式
2:sharding-total-count 總的分片數
3:job-sharding-strategy-type 分片策略
可以看它內置的三種策略,說明比較詳細,默認的是 平均分片策略。
下面再說說如何自定義分片策略,ElasticJob加載分片策略使用的是JDK的spi (Service Provider Interface)加載的。
要使用SPI比較簡單,只需要按照以下幾個步驟操作即可:
- 在META-INF/services目錄下創建一個以"接口全限定名"為命名的文件,內容為實現類的全限定名
- 接口實現類所在的jar包在classpath下
- 主程序通過java.util.ServiceLoader動態狀態實現模塊,它通過掃描META-INF/services目錄下的配置文件找到實現類的全限定名,把類加載到JVM
- SPI的實現類必須帶一個無參構造方法
首先自定義一個策略類MyJobShardingStrategy,實現 JobShardingStrategy
package com.nova.elastic.job; import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance; import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class MyJobShardingStrategy implements JobShardingStrategy { /** * Sharding job. * * @param jobInstances all job instances which participate in sharding * @param jobName job name * @param shardingTotalCount sharding total count * @return sharding result */ @Override public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount) { Map<JobInstance, List<Integer>> result = new HashMap<>(); List<Integer> shardingItems = new ArrayList<>(shardingTotalCount + 1); for (int i=0; i<shardingTotalCount; i++) { shardingItems.add(i); } result.put(jobInstances.get(0), shardingItems); return result; } /** * Get type. * * @return type */ @Override public String getType() { return "MY_TEST"; } }
然后我們只需要在自己項目的resources下,建一個META-INF/services的文件夾,再創建以 a接口的全限定名(org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy),內容則為”com.nova.elastic.job.MyJobShardingStrategy“

這樣ElasticJob的主程序通過java.util.ServiceLoader就可以把我們自定義的策略類加載好。
最后就可以在xml中,job-sharding-strategy-type="MY_TEST", 配置使用自定義的分片策略。
三:存在的問題
我模擬了 1 台作業服務器且分片總數為2,則分片結果為:1=[0,1],然后我再自己的調度任務中打印了 shardingContext,
2021-06-08 16:33:35.029 [] INFO c.y.n.e.j.ApacheTestJob [traceId=] - ShardingContext(jobName=apacheTestJob-no-repeat, taskId=apacheTestJob-no-repeat@-@0,1@-@READY@-@172.16.0.4@-@23146, shardingTotalCount=2, jobParameter=, shardingItem=0, shardingParameter=null) 2021-06-08 16:33:35.029 [] INFO c.y.n.e.j.ApacheTestJob [traceId=] - ShardingContext(jobName=apacheTestJob-no-repeat, taskId=apacheTestJob-no-repeat@-@0,1@-@READY@-@172.16.0.4@-@23146, shardingTotalCount=2, jobParameter=, shardingItem=1, shardingParameter=null)
可以看到,在這種配置條件下,ApacheTestJob 是同時執行兩次,只有 shardingItem 有區別,那么這樣就會存在一個問題,我job的代碼邏輯就會執行兩次,只不過每次的shardingItem不同而已。
如果業務邏輯需要查詢數據庫,那么這樣就select了多次,在數據庫有瓶頸的系統下,效率肯定低。
反之,如果在這個配置下,調度任務只被調度一次,但是 ShardingContext 可以保存一個 shardingItem的列表,這樣就可以解決多次查詢數據庫的問題。
這也是用了這兩種ElasticJob后,感受到的最大的區別。
不知道有沒有正在使用 shardingsphere.elasticjob的小伙伴,你們的系統是如何使用的?有沒有存在相同的疑惑?又是如何解決這個問題的?
