Redis實現的分布式鎖和分布式限流


  隨着現在分布式越來越普遍,分布式鎖也十分常用,我的上一篇文章解釋了使用zookeeper實現分布式鎖(傳送門),本次咱們說一下如何用Redis實現分布式鎖和分布限流。

  Redis有個事務鎖,就是如下的命令,這個命令的含義是將一個value設置到一個key中,如果不存在將會賦值並且設置超時時間為30秒,如何這個key已經存在了,則不進行設置。

SET key value NX PX 30000

  這個事務鎖很好的解決了兩個單獨的命令,一個設置set key value nx,即該key不存在的話將對其進行設置,另一個是expire key seconds,設置該key的超時時間。我們可以想一下,如果這兩個命令用程序單獨使用會存在什么問題:

  1. 如果一個set key的命令設置了key,然后程序異常了,expire時間沒有設置,那么這個key會一直鎖住。

  2. 如果一個set key時出現了異常,但是直接執行了expire,過了一會兒之后另一個進行set key,還沒怎么執行代碼,結果key過期了,別的線程也進入了鎖。

  還有很多出問題的可能點,這里我們就不討論了,下面咱們來看看如何實現吧。本文使用的Spring Boot 2.x + Spring data redis + Swagger +lombok + AOP + lua腳本。在實現的過程中遇到了很多問題,都一一解決實現了。依賴的POM文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hqs</groupId>
    <artifactId>distributedlock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>distributedlock</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  使用了兩個lua腳本,一個用於執行lock,另一個執行unlock。咱們簡單看一下,lock腳本就是采用Redis事務執行的set nx px命令,其實還有set nx ex命令,這個ex命令是采用秒的方式進行設置過期時間,這個px是采用毫秒的方式設置過期時間。value需要使用一個唯一的值,這個值在解鎖的時候需要判斷是否一致,如果一致的話就進行解鎖。這個也是官方推薦的方法。另外在lock的地方我設置了一個result,用於輸出測試時的結果,這樣就可以結合程序去進行debug了。

local expire = tonumber(ARGV[2])
local ret = redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', expire)
local strret = tostring(ret)
--用於查看結果,我本機獲取鎖成功后程序返回隨機結果"table: 0x7fb4b3700fe0",否則返回"false"
redis.call('set', 'result', strret)
if strret == 'false' then
    return false
else
    return true
end
redis.call('del', 'result')
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])
else
    return 0
end

  來看下代碼,主要寫了兩個方法,一個是用與鎖另外一個是用於結解鎖。這塊需要注意的是使用RedisTemplate<String, String>,這塊意味着key和value一定都是String的,我在使用的過程中就出現了一些錯誤。首先初始化兩個腳本到程序中,然后調用執行腳本。

package com.hqs.distributedlock.lock;


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Collections;

@Slf4j
@Component
public class DistributedLock {

    //注意RedisTemplate用的String,String,后續所有用到的key和value都是String的
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    RedisScript<Boolean> lockScript;

    @Autowired
    RedisScript<Long> unlockScript;

    public Boolean distributedLock(String key, String uuid, String secondsToLock) {
        Boolean locked = false;
        try {
            String millSeconds = String.valueOf(Integer.parseInt(secondsToLock) * 1000);
            locked =redisTemplate.execute(lockScript, Collections.singletonList(key), uuid, millSeconds);
            log.info("distributedLock.key{}: - uuid:{}: - timeToLock:{} - locked:{} - millSeconds:{}",
                    key, uuid, secondsToLock, locked, millSeconds);
        } catch (Exception e) {
            log.error("error", e);
        }
        return locked;
    }

    public void distributedUnlock(String key, String uuid) {
        Long unlocked = redisTemplate.execute(unlockScript, Collections.singletonList(key),
                uuid);
        log.info("distributedLock.key{}: - uuid:{}: - unlocked:{}", key, uuid, unlocked);

    }

}

  還有一個就是腳本定義的地方需要注意,返回的結果集一定是Long, Boolean,List, 一個反序列化的值。這塊要注意。

package com.hqs.distributedlock.config;


import com.sun.org.apache.xpath.internal.operations.Bool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scripting.ScriptSource;
import org.springframework.scripting.support.ResourceScriptSource;


@Configuration
@Slf4j
public class BeanConfiguration {

    /**
     * The script resultType should be one of
     * Long, Boolean, List, or a deserialized value type. It can also be null if the script returns
     * a throw-away status (specifically, OK).
     * @return
     */
    @Bean
    public RedisScript<Long> limitScript() {
        RedisScript redisScript = null;
        try {
            ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua"));
//            log.info("script:{}", scriptSource.getScriptAsString());
            redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class);
        } catch (Exception e) {
            log.error("error", e);
        }
        return redisScript;

    }

    @Bean
    public RedisScript<Boolean> lockScript() {
        RedisScript<Boolean> redisScript = null;
        try {
            ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/lock.lua"));
            redisScript = RedisScript.of(scriptSource.getScriptAsString(), Boolean.class);
        } catch (Exception e) {
            log.error("error" , e);
        }
        return redisScript;
    }

    @Bean
    public RedisScript<Long> unlockScript() {
        RedisScript<Long> redisScript = null;
        try {
            ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/unlock.lua"));
            redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class);
        } catch (Exception e) {
            log.error("error" , e);
        }
        return redisScript;
    }


    @Bean
    public RedisScript<Long> limitAnother() {
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua")));
        redisScript.setResultType(Long.class);
        return redisScript;
    }

}

  好了,這塊就寫好了,然后寫好controller類准備測試。

  

   @PostMapping("/distributedLock")
    @ResponseBody
    public String distributedLock(String key, String uuid, String secondsToLock, String userId) throws Exception{
//        String uuid = UUID.randomUUID().toString();
        Boolean locked = false;
        try {
            locked = lock.distributedLock(key, uuid, secondsToLock);
            if(locked) {
                log.info("userId:{} is locked - uuid:{}", userId, uuid);
                log.info("do business logic");
                TimeUnit.MICROSECONDS.sleep(3000);
            } else {
                log.info("userId:{} is not locked - uuid:{}", userId, uuid);
            }
        } catch (Exception e) {
            log.error("error", e);
        } finally {
            if(locked) {
                lock.distributedUnlock(key, uuid);
            }
        }

        return "ok";
    }

  我也寫了一個測試類,用於測試和輸出結果, 使用100個線程,然后鎖的時間設置10秒,controller里邊需要休眠3秒模擬業務執行。

    @Test
    public void distrubtedLock() {
        String url = "http://localhost:8080/distributedLock";
        String uuid = "abcdefg";
//        log.info("uuid:{}", uuid);
        String key = "redisLock";
        String secondsToLive = "10";

        for(int i = 0; i < 100; i++) {
            final int userId = i;
            new Thread(() -> {
                MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
                params.add("uuid", uuid);
                params.add("key", key);
                params.add("secondsToLock", secondsToLive);
                params.add("userId", String.valueOf(userId));
                String result = testRestTemplate.postForObject(url, params, String.class);
                System.out.println("-------------" + result);
            }
            ).start();
        }

    }

  獲取鎖的地方就會執行do business logic, 然后會有部分線程獲取到鎖並執行業務,執行完業務的就會釋放鎖。

  分布式鎖就實現好了,接下來實現分布式限流。先看一下limit的lua腳本,需要給腳本傳兩個值,一個值是限流的key,一個值是限流的數量。獲取當前key,然后判斷其值是否為nil,如果為nil的話需要賦值為0,然后進行加1並且和limit進行比對,如果大於limt即返回0,說明限流了,如果小於limit則需要使用Redis的INCRBY key 1,就是將key進行加1命令。並且設置超時時間,超時時間是秒,並且如果有需要的話這個秒也是可以用參數進行設置。

--lua 下標從 1 開始
-- 限流 key
local key = KEYS[1]
-- 限流大小
local limit = tonumber(ARGV[1])

-- 獲取當前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")

if curentLimit + 1 > limit then
    -- 達到限流大小 返回
    return 0;
else
    -- 沒有達到閾值 value + 1
    redis.call("INCRBY", key, 1)
    -- EXPIRE后邊的單位是秒
    redis.call("EXPIRE", key, 10)
    return curentLimit + 1
end

  執行limit的腳本和執行lock的腳本類似。

package com.hqs.distributedlock.limit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Collections;

/**
 * @author huangqingshi
 * @Date 2019-01-17
 */
@Slf4j
@Component
public class DistributedLimit {

    //注意RedisTemplate用的String,String,后續所有用到的key和value都是String的
    @Autowired
    private RedisTemplate<String, String> redisTemplate;


    @Autowired
    RedisScript<Long> limitScript;

    public Boolean distributedLimit(String key, String limit) {
        Long id = 0L;

        try {
            id = redisTemplate.execute(limitScript, Collections.singletonList(key),
                    limit);
            log.info("id:{}", id);
        } catch (Exception e) {
            log.error("error", e);
        }

        if(id == 0L) {
            return false;
        } else {
            return true;
        }
    }

}

  接下來咱們寫一個限流注解,並且設置注解的key和限流的大小:

package com.hqs.distributedlock.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 自定義limit注解
 * @author huangqingshi
 * @Date 2019-01-17
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistriLimitAnno {
    public String limitKey() default "limit";
    public int limit() default 1;
}

  然后對注解進行切面,在切面中判斷是否超過limit,如果超過limit的時候就需要拋出異常exceeded limit,否則正常執行。

package com.hqs.distributedlock.aspect;

import com.hqs.distributedlock.annotation.DistriLimitAnno;
import com.hqs.distributedlock.limit.DistributedLimit;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/**
 * @author huangqingshi
 * @Date 2019-01-17
 */
@Slf4j
@Aspect
@Component
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class LimitAspect {

    @Autowired
    DistributedLimit distributedLimit;

    @Pointcut("@annotation(com.hqs.distributedlock.annotation.DistriLimitAnno)")
    public void limit() {};

    @Before("limit()")
    public void beforeLimit(JoinPoint joinPoint) throws Exception {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        DistriLimitAnno distriLimitAnno = method.getAnnotation(DistriLimitAnno.class);
        String key = distriLimitAnno.limitKey();
        int limit = distriLimitAnno.limit();
        Boolean exceededLimit = distributedLimit.distributedLimit(key, String.valueOf(limit));
        if(!exceededLimit) {
            throw new RuntimeException("exceeded limit");
        }
    }

}

  因為有拋出異常,這里我弄了一個統一的controller錯誤處理,如果controller出現Exception的時候都需要走這塊異常。如果是正常的RunTimeException的時候獲取一下,否則將異常獲取一下並且輸出。

package com.hqs.distributedlock.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.context.request.NativeWebRequest;

import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
import java.util.Map;

/**
 * @author huangqingshi
 * @Date 2019-01-17
 * 統一的controller錯誤處理
 */
@Slf4j
@ControllerAdvice
public class UnifiedErrorHandler {
    private static Map<String, String> res = new HashMap<>(2);

    @ExceptionHandler(value = Exception.class)
    @ResponseStatus(HttpStatus.OK)
    @ResponseBody
    public Object processException(HttpServletRequest req, Exception e) {
        res.put("url", req.getRequestURL().toString());

        if(e instanceof RuntimeException) {
            res.put("mess", e.getMessage());
        } else {
            res.put("mess", "sorry error happens");
        }
        return res;
    }

}

  好了,接下來將注解寫到自定義的controller上,limit的大小為10,也就是10秒鍾內限制10次訪問。

@PostMapping("/distributedLimit")
    @ResponseBody
    @DistriLimitAnno(limitKey="limit", limit = 10)
    public String distributedLimit(String userId) {
        log.info(userId);
        return "ok";
    }

  也是來一段Test方法來跑,老方式100個線程開始跑,只有10次,其他的都是limit。沒有問題。

  總結一下,這次實現采用了使用lua腳本和Redis實現了鎖和限流,但是真實使用的時候還需要多測試,另外如果此次Redis也是采用的單機實現方法,使用集群的時候可能需要改造一下。關於鎖這塊其實Reids他們自己也實現了RedLock, java實現的版本Redission。也有很多公司使用了,功能非常強大。各種場景下都用到了。

  如有問題,歡迎拍磚~

  

  

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM