1.場景描述
老項目需要多機部署,項目中有幾十個定時任務,一旦多機部署,定時任務就會重復執行,固定ip與錯開時間方案都存在較大弊端,最終采用的方案是:AOP+排他鎖的方式,軟件老王已驗證通過,介紹下,有需要的朋友可以參考下。
2.解決方案
軟件老王基本方案是采用:AOP+排他鎖的方式。
(1)目前老項目有幾十個定時任務,采用AOP的方式,可以保證代碼的無侵入(即使簡單的微侵入,例如增加幾行代碼,測試驗證的工作量也會比較大的)。
(2)采用排他鎖的方式,保證批處理的高可用,不重復執行。
2.1 AOP編程
Aop的概念就不說了,就是面向切面編程,通俗點就是統一處理一類問題,比如日志、請求鑒權等,剛開始不確定是否可行,系統中的批處理是使用spring注解的方式@Scheduled進行批處理,采用aop對注解@Scheduled進行編程,統一攔截批處理,代碼如下:
/**
* 軟件老王-AOP處理類
*/
@Aspect
@Component
public class ScheduledAspect {
@Autowired
ScheduleService scheduleService ;
@Pointcut( "@annotation(org.springframework.scheduling.annotation.Scheduled)")
public void scheduled() {
}
@Around("scheduled()")
public Object scheduled(ProceedingJoinPoint pjd) {
Object result = null;
String taskName = pjd.getSignature().getName();
try {
if (scheduleService.isInvoke(taskName)){
return result;
}
result = pjd.proceed();
scheduleService.end(taskName);
} catch (Throwable e) {
throw new RuntimeException(e);
}
return result;
}
}
說明:
(1)面向標簽編程
@Pointcut( "@annotation(org.springframework.scheduling.annotation.Scheduled)")
這樣注解會攔截標簽@Scheduled。
(2)使用aop的環繞標簽 @Around("scheduled()")
@before標簽拿不到執行完成狀態,需要使用環繞標簽@@Around,在標簽中可以拿到執行完成后狀態,以便放開鎖。
result = pjd.proceed();
(3)結合排他鎖使用
@Autowired
ScheduleService scheduleService ;
2.2 排他鎖
排他鎖,簡單來說就是通過數據庫總的標志位+版版號進行的控制.
軟件老王的代碼如下,:
/**
* 軟件老王-排他鎖服務類
*/
@Service
public class ScheduleService {
@Autowired
ScheduleClusterMapper scheduleClusterMapper;
public boolean isInvoke(String taskName) {
boolean isValid = false;
try {
ScheduleCluster carIndexEntity = scheduleClusterMapper.selectByTaskName(taskName);
int execute = carIndexEntity.getExecute();
String ip = InetAddress.getLocalHost().getHostAddress();
long currentTimeMillis = System.currentTimeMillis();
long time = carIndexEntity.getUpdatedate().getTime();
if (execute == 0) {
isValid = start(taskName, carIndexEntity.getVersion(), ip);
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
return isValid;
}
//執行鎖機制,軟件老王
public boolean start(String taskName, int version, String ip) {
ScheduleCluster scheduleCluster = new ScheduleCluster();
scheduleCluster.setVersion(version);
scheduleCluster.setExecuteIp(ip);
scheduleCluster.setUpdatedate(DateUtil.getCurrentTime());
scheduleCluster.setTaskName(taskName);
scheduleCluster.setExecute(1);
int count = scheduleClusterMapper.updateByTaskName(scheduleCluster);
if (count > 0) {
return true;
}
return false;
}
//執行解鎖機制,軟件老王
public void end(String taskName) {
ScheduleCluster scheduleCluster = new ScheduleCluster();
scheduleCluster.setUpdatedate(DateUtil.getCurrentTime());
scheduleCluster.setTaskName(taskName);
scheduleCluster.setExecute(0);
scheduleClusterMapper.updateNormalByTaskName(scheduleCluster);
}
}
說明:
大的原理是在where條件后帶上版本號,在update中更新version+1,這樣通過影響數據庫的影響條數,來判斷是否拿到鎖。
(1)主類中調用start方法,該方法是更新批處理狀態,軟件老王這里設置了一個小點,在updateByTaskName的mybatis方法中,有個version+1的更新;
(2)end方法放在更新完成后,釋放鎖。
(3)其實還有一個點,可以考慮下,需要有個機制,比如出現異常情況,剛好批處理執行中,重啟服務了等,下次批處理執行前,假如鎖還未釋放,代碼中增加釋放鎖的機制。
2.3 數據庫相關
(1)數據庫表設計
(2)mybatis相關方法
(1)第一個是start對應方法,執行鎖和version增加。
<update id="updateByTaskName" parameterType="com.yutong.dmp.entity.ScheduleCluster">
update t_schedule_cluster
<set>
<if test="executeIp != null">
execute_ip = #{executeIp,jdbcType=VARCHAR},
</if>
<if test="version != null">
version = #{version,jdbcType=INTEGER} + 1,
</if>
<if test="execute != null">
execute = #{execute,jdbcType=INTEGER},
</if>
<if test="status != null">
status = #{status,jdbcType=VARCHAR},
</if>
<if test="createby != null">
createby = #{createby,jdbcType=VARCHAR},
</if>
<if test="createdate != null">
createdate = #{createdate,jdbcType=TIMESTAMP},
</if>
<if test="updateby != null">
updateby = #{updateby,jdbcType=VARCHAR},
</if>
<if test="updatedate != null">
updatedate = #{updatedate,jdbcType=TIMESTAMP},
</if>
</set>
where task_name = #{taskName,jdbcType=VARCHAR}
and version = #{version,jdbcType=INTEGER}
and status ='1'
</update>
(2)第二個是釋放鎖,更改excute為0。
<update id="updateNormalByTaskName" parameterType="com.yutong.dmp.entity.ScheduleCluster">
update t_schedule_cluster
<set>
<if test="executeIp != null">
execute_ip = #{executeIp,jdbcType=VARCHAR},
</if>
<if test="version != null">
version = #{version,jdbcType=INTEGER},
</if>
<if test="execute != null">
execute = #{execute,jdbcType=INTEGER},
</if>
<if test="status != null">
status = #{status,jdbcType=VARCHAR},
</if>
<if test="createby != null">
createby = #{createby,jdbcType=VARCHAR},
</if>
<if test="createdate != null">
createdate = #{createdate,jdbcType=TIMESTAMP},
</if>
<if test="updateby != null">
updateby = #{updateby,jdbcType=VARCHAR},
</if>
<if test="updatedate != null">
updatedate = #{updatedate,jdbcType=TIMESTAMP},
</if>
</set>
where task_name = #{taskName,jdbcType=VARCHAR}
and status ='1'
</update>
I’m 「軟件老王」,如果覺得還可以的話,關注下唄,后續更新秒知!歡迎討論區、同名公眾號留言交流!