基於Redis實現分布式定時任務調度


 

項目開發過程中,難免會有許多定時任務的需求進來。如果項目中還沒有引入quarzt框架的情況下,我們通常會使用Spring的@Schedule(cron="* * * * *")注解

樣例如下:

package com.slowcity.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;

public class SentMailTask {
    private static final Logger log = LoggerFactory.getLogger(SentMailTask.class);
   /**
    * 定時任務
    */
    @Scheduled(cron = "0 0/1 * * * ? ")
    public void closeOrderTaskV1() {
        log.info(".........schedule task start.........");
        
        sentMailToCustomer();
        
        log.info(".........schedule task end.........");
    }
     
    public void sentMailToCustomer() {
        log.info(".........sent mail to customer.........");
    }
}
 

這樣實現自然是沒有什么問題,對於單台機器部署,任務每一分鍾執行一次。部署多台機器時,同一個任務會執行多次

在我們的項目當中,使用定時任務是避免不了的,我們在部署定時任務時,通常只部署一台機器,此時可用性又無法保證現實情況是獨立的應用服務通常會部署在兩台及以上機器的時候,假如有3台機器,則會出現同一時間3台機器都會觸發的情況,結果就是會向客戶發送三封一模一樣的郵件,真讓人頭疼。如果使用quarzt,就不存在這個情況了。

這種並發的問題,簡單點說是鎖的問題,具體點是分布式鎖的問題,所以在這段代碼上加個分布式鎖就可以了。分布式鎖,首先想到的是redis,畢竟輪子都是現成的。

package com.slowcity.redis;

import java.util.Collections;
import redis.clients.jedis.Jedis;

public class RedisPool {
    private static final String LOCK_SUCCESS="OK";
    private static final String SET_IF_NOT_EXIST="NX";
    private static final String SET_WITH_EXPIRE_TIME="PX";
    private static final Long RELEASE_SUCCESS=1L;
    
    /**
     * 獲取分布式鎖
     * @param jedis
     * @param lockKey
     * @param requestID
     * @param expireTime
     * @return
     */
    public static boolean getDistributedLock(Jedis jedis,String lockKey,String requestId,int expireTime) {
        String result = jedis.set(lockKey,requestId,SET_IF_NOT_EXIST,SET_WITH_EXPIRE_TIME,expireTime);
        if(LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
        
    }
    /**
     * 釋放分布式鎖
     * @param jedis
     * @param lockKey
     * @param requestId
     * @return
     */
    public static boolean releaseDistributedLock(Jedis jedis,String lockKey,String requestId) {
        String script = "if redis.call('get',KEYS[1])== ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
        Object result = jedis.eval(script,Collections.singletonList(lockKey),Collections.singletonList(requestId));
        if(RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }
}

改造一下定時任務,增加分布式鎖

package com.slowcity.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;

import redis.clients.jedis.Jedis;

public class SentMailTask {
    private static final Logger log = LoggerFactory.getLogger(SentMailTask.class);
   /**
    * 定時任務
    */
    @Scheduled(cron = "0 0/1 * * * ? ")
    public void closeOrderTaskV1() {
        log.info(".........schedule task start.........");
        Jedis jedis = new Jedis("10.2.1.17",6379);
        boolean locked = RedisPool.getDistributedLock(jedis, "", "", 10*1000);
        if(locked) {
            sentMailToCustomer();
        }
        RedisPool.releaseDistributedLock(jedis, "", "");
        jedis.close();
        log.info(".........schedule task end.........");
    }
     
    public void sentMailToCustomer() {
        log.info(".........sent mail to customer.........");
    }
}
 

再執行定時任務,多台機器部署,只執行一次。

關於jedis對象的獲取,一般都是springboot自動化配置的,所有會想到工廠方法。優化如下:

package com.slowcity.redis;

import java.lang.reflect.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.ReflectionUtils;
import redis.clients.jedis.Jedis;

public class SentMailTask {
    private static final Logger log = LoggerFactory.getLogger(SentMailTask.class);
   
    @Autowired
    private RedisConnectionFactory redisConectionFactory;
    
    /**
    * 定時任務
    */
    @Scheduled(cron = "0 0/1 * * * ? ")
    public void closeOrderTaskV1() {
        log.info(".........schedule task start.........");
        
        RedisConnection redisConnection = redisConectionFactory.getConnection();
        Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis");
        Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, redisConnection);
       
        boolean locked = RedisPool.getDistributedLock(jedis, "lockKey", "requestId", 10*1000);
        if(locked) {
            sentMailToCustomer();
        }
        RedisPool.releaseDistributedLock(jedis, "", "");
        jedis.close();
        log.info(".........schedule task end.........");
    }
     
    public void sentMailToCustomer() {
        log.info(".........sent mail to customer.........");
    }
}
  

再也不用擔心,應用服務多台機器部署,每台機器都觸發的尷尬了。如果定時任務很多,最好的還是老老實實寫個任務調度中心,一來方便管理,二來方便維護。

補充部分:

 一些關於lua腳本的解釋

String script = "if redis.call('get',KEYS[1])== ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Object result = jedis.eval(script,Collections.singletonList(lockKey),Collections.singletonList(requestId));        

如果一個請求更新緩存的時間比較長,甚至比鎖的有效期還要長,導致在緩存更新的過程中,鎖就失效了,此時另一個請求就會獲取鎖,但前一個請求在緩存更新完畢的時候,如果不加以判斷就直接刪除鎖,就會出現誤刪除其它請求創建的鎖的情況。

【end】

 

一點補充的話,寫完這篇博客后來看其他博客,也有一種redis鎖是關聯主機ip的,思路上是可行的,不失一個方法點,主要描述如下:

每個定時任務都在Redis中設置一個Key-Value,Key為自定義的每個定時任務的名字(如task1:redis:lock),Value為服務器Ip,同時設置合適的過期時間(例如設置為5min)。

每個節點在執行時,都要進行以下操作:

  • 1.是否存在Key,若不存在,則設置Key-Value,Value為當前節點的IP
  • 2.若存在Key,則比較Value是否是當前Ip,若是則繼續執行定時任務,若不是,則不往下執行。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM