分布式鎖實現,與分布式定時任務


寫在前面

 

redis辣么多數據結構,這么多命令,具體一點,都可以應用在什么場景呢?用來解決什么具體的問題?

分布式鎖

redis是網絡單線程的,它只有一個線程負責接受請求,這個特性即降低了redis本身的開發成本,也提高了redis的可用性。

分布式環境下,數據一致性問題一直是一個比較重要的話題,分布式與單機情況下最大的不同在於其不是多線程而是多進程。

多線程由於可以共享堆內存,因此可以簡單的采取內存作為標記存儲位置,例如cas,java的synchronize。而進程之間可能不在同一台物理機上,因此需要將標記存儲在一個所有進程都能看到的地方。

常見的場景,秒殺場景中的庫存超賣問題、多機定時任務的並發執行問題等。

庫存超賣問題

假如訂單服務部署了多個實例。

現在做一個商品秒殺活動,商品一共只有2個,同時購買的用戶則可能有幾千上萬。

理想狀態下第一個和第二個用戶能購買成功,其他用戶提示購買失敗,

實際可能出現的情況是,多個用戶都同時查到商品還沒賣完,第一個用戶買到,更新庫存之前,第二個用戶又下了訂單,導致出錯。

下面用java代碼做一個演示:

java實例都可以被正常運行在jdk1.8+,使用jedis連接redis實例

 

 

復制代碼
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
 * JedisPool連接
 * @author taifeng zhang
 * */
public class JedisPoolConnect {
    public static JedisPool jedispool;

    /**
     * 連接並返回jedis實例
     * */
    public static Jedis connectJedis () {
        if (jedispool == null) {
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMinIdle(1);
            jedisPoolConfig.setMaxIdle(10);
            jedisPoolConfig.setTestOnBorrow(true);
            jedispool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379);
        }
        return jedispool.getResource();
    }
}
import redis.clients.jedis.*;
import redis.clients.jedis.Jedis;

/**
 *  一個簡單的超賣演示程序
 * */
public class MarketWrong {
    public static String GOODS_LEN_KEY = "jedis:market:demo";
    private final Integer DECR_THREAD_LEN = 16;

    public void superMarket () {

        // 開線程去減庫存
        int i = DECR_THREAD_LEN;
        while (i > 0) {

            new Thread(() -> {

                boolean hasGoods = true;
                while (hasGoods) { // 當庫存大於0的時候

                    int goodsLen = getGoodsLen();
                    if (goodsLen > 0) {
                        decrGoodsLen(); // 一般進來之后就直接減去庫存了


                        System.out.println("現在庫存為" + getGoodsLen());
                        try {
                            Thread.sleep(100); //模擬中間處理流程
                        } catch (Exception e) {
                            System.out.println("執行減庫存錯誤" + e.getMessage() + e.getLocalizedMessage() + e.getStackTrace());
                        } finally {
                            // 最后邏輯
                        }

                    } else {
                        System.out.println("======賣完啦=======");
                        hasGoods = false;
                    }

                }

            }).start();

            i--;
        }
    }

    public void setGoodsLen (Integer len) {
        Jedis jedis = JedisPoolConnect.connectJedis();
        try {
            jedis.set(GOODS_LEN_KEY, String.valueOf(len));
        } finally {
            jedis.close();
        }

    }

    private Integer getGoodsLen () {
        Jedis jedis = JedisPoolConnect.connectJedis();
        try {
            String val = jedis.get(GOODS_LEN_KEY);
            if (val != null) {
                return Integer.parseInt(val);
            }
        } finally {
            jedis.close();
        }
        return 0;
    }

    private void decrGoodsLen () {
        Jedis jedis = JedisPoolConnect.connectJedis();
        try {
            // 庫存減1
            jedis.decr(GOODS_LEN_KEY);
        } finally {
            jedis.close();
        }
    }
}
復制代碼

 

用junit測試上面的代碼:
復制代碼
import org.junit.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class MarketWrongTests {
    /**
     * 測試超賣小程序
     */
    @Test
    public void superMarket () throws Exception {
        MarketWrong marketWrong = new MarketWrong();
        // 這次就賣500件吧
        marketWrong.setGoodsLen(500);
        marketWrong.superMarket();
        Thread.sleep(60000); // 賣一分鍾
    }
}
復制代碼

 

運行輸出,每次庫存都會變為負數,開了16個線程同時買東西:

復制代碼
// 省略了幾萬行
現在庫存為8
現在庫存為8
現在庫存為4
現在庫存為4
現在庫存為4
現在庫存為4
現在庫存為3
現在庫存為-5
現在庫存為-5
現在庫存為-5
現在庫存為-5
現在庫存為-5
現在庫存為-5
現在庫存為-5
現在庫存為-5
======賣完啦=======
======賣完啦=======
======賣完啦=======
復制代碼

 

上面的代碼示例中,庫存數據是共享資源(存到redis了,相當於數據庫),面對高並發情形,需要保證對資源的訪問次序。在單機環境Java提供基於內存的鎖來處理並發問題,但是這些API在分布式場景中就無能為力了。也就是說單純的內存鎖並不能提供這種多機器並發服務的能力。分布式系統中,由於分布式系統的分布性,即多線程和多進程並且分布在不同機器中,synchronized和lock這兩種鎖將失去原有鎖的效果,需要我們自己實現分布式鎖。

也就是說庫存的遞減必須是順序的

常見的鎖方案如下:

基於數據庫實現分布式鎖 基於緩存,實現分布式鎖,如redis 基於Zookeeper實現分布式鎖

下面實現一個redis的鎖,剖析一把redis是如何實現分布式鎖的:

復制代碼
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.params.SetParams;

import java.util.ArrayList;
import java.util.Arrays;

/**
 * redis鎖實現
 * @author taifeng zhang
 * */
public class RedisLock {
    private static String REDIS_LOCK_KEY = "redis:lock:key";
    /**
     *設置lockkey
     * */
    public static void setRedisLockKey(String redisLockKey) {
        REDIS_LOCK_KEY = redisLockKey;
    }
    /**
     * 嘗試獲取鎖
     * @param ov 可以指定一個鎖標識,鎖的唯一值,區分每個鎖的所有者身份
     * @param timeout 獲取鎖的超時時間
     * */
    public boolean tryLock (String ov, int timeout) {
        Jedis jedis = JedisPoolConnect.connectJedis();
        try {
            // set nx ex
            SetParams setParams = new SetParams();
            setParams.nx();
            setParams.ex(timeout);
            Object val = jedis.set(REDIS_LOCK_KEY, ov, setParams); // set [key] nx ex [timeout]
            return val != null;
        } finally {
            jedis.close();
        }
    }
    /**
     * 使用lua腳本釋放鎖
     * @param ov 釋放之前先確定解鎖人的身份,所以要用到lua的原子特性
     * */
    public boolean tryUnlock (String ov) {
        Jedis jedis = JedisPoolConnect.connectJedis();
        try {
            String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end";
            String sha1 = jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL);
            String[] keys = {REDIS_LOCK_KEY};
            String[] args = {ov};
            Integer val = Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString());
            return val > 0;
        } finally {
            jedis.close();
        }
    }
}
復制代碼

 

實現原則有幾點: 1、原子相關操作步驟必須全部包括在鎖內 2、每個鎖都有一個唯一的value,標識加鎖人的身份。 3、加超時時間防止死鎖 (超時時間要合理)

  • 加鎖代碼解析
復制代碼
/**
* 嘗試獲取鎖
* @param ov 可以指定一個鎖標識,鎖的唯一值,區分每個鎖的所有者身份
* @param timeout 獲取鎖的超時時間
* */
public boolean tryLock (String ov, int timeout) {
    Jedis jedis = JedisPoolConnect.connectJedis();
    try {
        // set nx ex
        SetParams setParams = new SetParams();
        setParams.nx();
        setParams.ex(timeout);
        Object val = jedis.set(REDIS_LOCK_KEY, ov, setParams); // 用 set [key] nx ex [timeout] 命令模擬加鎖
        return val != null;
    } finally {
        jedis.close();
    }
}
復制代碼

 

加鎖的代碼很簡單,其實就是利用redis命令 set [key] nx ex [timeout] 的特性,已有值的時候返回值為nil,如果執行這個命令的結果是null,那就可以認為資源已經被上鎖

同時,set也將REDIS_LOCK_KEY設置為一個唯一值,在解鎖的時候或者鎖重入的時候判斷身份使用。

  • 解鎖代碼解析
復制代碼
/**
* 使用lua腳本釋放鎖
* @param ov 釋放之前先確定解鎖人的身份,所以要用到lua的原子特性
* */
public boolean tryUnlock (String ov) {
    Jedis jedis = JedisPoolConnect.connectJedis();
    try {
        String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end";
        String sha1 = jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL);
        String[] keys = {REDIS_LOCK_KEY};
        String[] args = {ov};
        Integer val = Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString());
        return val > 0;
    } finally {
        jedis.close();
    }
}
復制代碼

 

解鎖代碼的精髓是這句lua腳本:

if (redis.call('get', KEYS[1]) == ARGV[1]) then
    return redis.call('del', KEYS[1])
else return 0

 

從redis讀取key的值,如果它等於傳入的唯一key,則可以釋放鎖,否則返回0

為什么要檢查唯一key再釋放鎖呢?主要是為了這么一個場景:

  • A用戶來獲取了鎖
  • B用戶來獲取鎖,鎖已經被a拿走了,等待鎖
  • A用戶可能因為突然發生網絡延遲,超過了超時時間,這時候鎖因為超時自動釋放了。
  • B用戶獲取了鎖
  • A用戶這時候網絡恢復了。。。這時候A用戶要釋放鎖,如果釋放成功就會導致連鎖反應,b用戶被解鎖,b又可能去解鎖c
  • 所以每次加鎖解鎖都需要驗證獲取鎖的用戶身份,一般存放在key的value里面,在釋放鎖之前先檢查,也就是 check and set

鎖的重入

上面談到,我們記錄了每個鎖的用戶身份,那是不是同一個用戶一次操作需要兩次鎖,是可以重用的呢?

答案是ok的

我們可以在trylock中加一個lua腳本用來先check 再 set,如果判斷check與用戶符合,則直接返回true就可以了。

復制代碼
public boolean tryLock (String ov, int timeout) {
    Jedis jedis = JedisPoolConnect.connectJedis();
    try {
        // 加上鎖的重入特性
        String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return 1 else return 0 end"; // 如果當前鎖的值等於ov的話,認為來獲取鎖的還是同一個人
        String sha1 = jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL);
        String[] keys = {REDIS_LOCK_KEY};
        String[] args = {ov};
        Integer val = Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString());
        if (val > 0) { // 判定成功后,鎖就重入了,即無需第二次獲取鎖
            return true;
        }

        // set nx ex
        SetParams setParams = new SetParams();
        setParams.nx();
        setParams.ex(timeout);
        Object val = jedis.set(REDIS_LOCK_KEY, ov, setParams); // set [key] nx ex [timeout]
        return val != null;
    } finally {
        jedis.close();
    }
}
復制代碼

 

最后我們看看關於超賣問題,我們將代碼加上鎖 注意兩個todo的地方。

復制代碼
import redis.clients.jedis.*;
import redis.clients.jedis.Jedis;
public class MarketWrong {
    public static String GOODS_LEN_KEY = "jedis:market:demo";
    private final Integer DECR_THREAD_LEN = 16;
    RedisLock redisLock = new RedisLock();

    public void superMarket () {

        // 開線程去減庫存
        int i = DECR_THREAD_LEN;
        while (i > 0) {
            int whilekey = i;
            new Thread(() -> {
                int n;
                int j = 0;
                boolean hasGoods = true;
                while (hasGoods) { // 當庫存大於0的時候
                    String ov = whilekey + "-" + j;
                    // todo 加鎖
                    while (!redisLock.tryLock(ov, 20)) { // 如果獲取不到鎖就等待
                    }

                    int goodsLen = getGoodsLen();
                    if (goodsLen > 0) {
                        decrGoodsLen(); // 一般進來之后就直接減去庫存了
                        System.out.println("現在庫存為" + getGoodsLen());

                        redisLock.tryUnlock(ov); // todo 解除鎖

                        try {
                            Thread.sleep(100); //模擬中間處理流程
                        } catch (Exception e) {
                            System.out.println("執行減庫存錯誤" + e.getMessage() + e.getLocalizedMessage() + e.getStackTrace());
                        } finally {
                            // 最后邏輯
                        }

                    } else {
                        System.out.println("======賣完啦=======");
                        hasGoods = false;
                    }
                    j++; // 需要這個用來生成ov,相當於模擬每一個買家的id
                }
            }).start();
            i--;
        }
    }
    /**
     *  一個簡單的超賣演示程序
     * */
    public void setGoodsLen (Integer len) {
        Jedis jedis = JedisPoolConnect.connectJedis();
        try {
            jedis.set(GOODS_LEN_KEY, String.valueOf(len));
        } finally {
            jedis.close();
        }

    }

    private Integer getGoodsLen () {
        Jedis jedis = JedisPoolConnect.connectJedis();
        try {
            String val = jedis.get(GOODS_LEN_KEY);
            if (val != null) {
                return Integer.parseInt(val);
            }
        } finally {
            jedis.close();
        }
        return 0;
    }

    private void decrGoodsLen () {
        Jedis jedis = JedisPoolConnect.connectJedis();
        try {
            // 庫存減1
            jedis.decr(GOODS_LEN_KEY);
        } finally {
            jedis.close();
        }
    }
}
復制代碼

 

加上鎖之后再測試,超賣問題已解決,注意現在的輸出是線性遞增的,因為開線程的模擬方式就是並發處理,每次16個線程幾乎是同時進行的,所以在沒有鎖的時候,並發讀取的goodslen很有可能都是16個線程一樣的。

所以redis的這個鎖的實現也叫: 分布式互斥鎖

復制代碼
現在庫存為8
現在庫存為7
現在庫存為6
現在庫存為5
現在庫存為4
現在庫存為3
現在庫存為2
現在庫存為1
現在庫存為0
======賣完啦=======
======賣完啦=======
======賣完啦=======
復制代碼

 

redis實現的分布式互斥鎖並不完美,但在大多數應用場景下夠用了,另外還可以使用zookeeper甚至mysql來實現。

分布式定時任務問題

分布式場景下,還有另外一個問題--定時任務並發問題,當我們的應用采用分布式部署的時候,就必然會有各種定時任務被部署到不同的機器實例上,如果兩台機器同時運行同一個定時任務的話,任務就執行了兩次。

這個問題可能更復雜一點,僅僅是加一個鎖有可能會壞事兒,因為定時任務的多機分布會產生幾個需要解決的問題:

  • 多台機器的時間一致性問題

    如果多台機器的時區不一致,那鎖基本上無從談起了。 或者時區一致,但可能服務器時間相差幾秒鍾,那么也有可能導致鎖丟失。

  • 鎖未釋放問題(服務器宕機怎么辦)

    那么如果serverA在加鎖的過程中,出現宕機怎么辦,是否會一直處於加鎖狀態

  • 命名空間問題

    每個定時任務應該有不同的鎖命名,防止出現同名鎖。

還是讓我們看一個java代碼的例子 注意,redis連接和鎖代碼有復用上面一節的

復制代碼
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;

@Component
@EnableScheduling
public class ScheduleDemo {
    private String sourceKey = "redis:schedule:test:key";
    private void sendEmail (String serviceKey) throws InterruptedException {
        Jedis jedis = JedisPoolConnect.connectJedis();
        try {
            Integer sendPatch = 0; // 從redis讀取來模擬發送的批次
            Object val = jedis.get(sourceKey);
            if (val != null) {
                sendPatch = Integer.parseInt(val.toString());
            }

            Thread.sleep(2000);
            System.out.println("批次[" + sendPatch +"]====發送郵件====" + serviceKey);
            jedis.incr(sourceKey); // 批次加1
        } finally {
            jedis.close();
        }

    }

    // 模擬service
    @Scheduled(cron = "0 27 09 * * ?") // 【cron改為后面的時間】
    public void serviceA () throws InterruptedException {
        this.sendEmail("service");
    }
}
復制代碼

 

將這段代碼打開兩個實例運行【ps,你可以在idea中右上角直接配兩個config就可以了】

avatar

看運行結果:

avatar avatar

郵件1被同時發送了兩次,這是不可接受的。

ok,有的同學現在就想到了,加個鎖就完事了

我們將發送代碼加上一個鎖解決這個問題:在sendmail里加一個redis分布式鎖

復制代碼
private void sendEmail (String serviceKey) throws InterruptedException {

    if (!redisLock.tryLock(serviceKey, 30)) {
        return; // todo 獲取不到鎖就取消,同一個定時任務只需要執行一次
    }

    Jedis jedis = JedisPoolConnect.connectJedis();
    try {
        Integer sendPatch = 0; // 從redis讀取來模擬發送的批次
        Object val = jedis.get(sourceKey);
        if (val != null) {
            sendPatch = Integer.parseInt(val.toString());
        }

        Thread.sleep(2000);
        System.out.println("批次[" + sendPatch +"]====發送郵件====" + serviceKey);
        jedis.incr(sourceKey); // 批次加1

        redisLock.tryUnlock(serviceKey); // todo 解鎖

    } finally {
        jedis.close();
    }

}
復制代碼

 

如果獲取不到鎖,那么取消這個任務的執行,看起來很完美對不對?

實際上沒有解決的問題還有很多。

  • 多個定時任務的多個並發執行sendmail,key如何保證唯一?

可以使用實例的ip+端口做唯一key,這樣能夠保證多個實例的唯一性

  • 兩台服務器時間差超過30s怎么辦?

通過中間媒介來確定時間。或者在服務器中杜絕這個問題

  • 最重要的問題還是在於,兩台服務器的時間有可能有細微差別,他們本身就有可能不是並發的

這一點在分布式定時任務領域里很重要。

僅僅是加了一個同步鎖是遠遠不夠的

解決方案可以是根據業務的不同來設置不同的鎖超時時間,例如某個業務定時任務,每天只可以執行一次,那么將超時時間設置為1個小時最保險,如果某個定時任務每分鍾執行,執行操作時間大約20s,那你可以將超時時間設置成30s。

另一個解決方案是設置一個統一的、中心級別的定時任務,任務負責派發消息,通過消息隊列的方式來做定時,這里就不細表,這種方式比較適合異構、或者跨網絡、跨機房級別的分布式。

可以對redis鎖做一次小小的改版升級,使用aop加注解來完成鎖的配置:

我們定義一個方法級別的aop注解

復制代碼
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * redis lock
 * @author taifeng zhang
 * */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedisLockAop {
    String key();
    /**
     * 兩種類型可選
     * wait = 等待鎖
     * return = 取消執行
     * */
    String type() default "wait";
    int timeout() default 30;
}
復制代碼

 

然后通過aop,去為加了注解的方法做鎖操作

復制代碼
import com.halfway.halfway.redis.RedisLock;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 * redislock aop實現
 * @author by taifeng zhang
 * */
@Component
@Aspect
public class RedisLockAopAspect {
    private RedisLock redisLock = new RedisLock();
    @Around("@within(com.halfway.halfway.redis.lockAop.RedisLockAop) && @annotation(lock)")
    public Object excuteAop (ProceedingJoinPoint pjp, RedisLockAop lock) throws Throwable {
        if ("wait".equals(lock.type())) {
            while (!redisLock.tryLock(lock.key(), lock.timeout())) {} // todo 等待鎖
        } else if ("return".equals(lock.type())) {
            if (!redisLock.tryLock(lock.key(), lock.timeout())) {
                return null; // todo 取消執行
            }
        } else {
            throw new NullPointerException("type只可以是wait或者return");
        }
        Object val = pjp.proceed();
        redisLock.tryUnlock(lock.key());
        return val;
    }
}
復制代碼

 

這個方式的好處是鎖與代碼解耦,無需關注鎖的內部實現變化

@Scheduled(cron = "0/30 * * * * ?")
@RedisLockAop(key = "serviceIp:port", type="return", timeout=15)
public void serviceA () throws InterruptedException {
    this.sendEmail("service");
}

 

...持續更新

 

github: https://github.com/294678380/redis-lerning


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM