使用Redis分布式鎖處理並發,解決超賣問題


一、synchronized處理並發

首先,synchronized的確是一個解決辦法,而且也很簡單,在方法前面加一個synchronized關鍵字。

但是通過壓測,發現請求變的很慢,因為:
synchronized就用一個鎖把這個方法鎖住了,每次訪問這個方法,只會有一個線程,所以這就是它導致慢的原因。通過這種方式,保證這個方法中的代碼都是單線程來處理,不會出什么問題。

同時,使用synchronized還是存在一些問題的,首先,它無法做到細粒度的控制,比如同一時間有秒殺A商品和B商品的請求,都進入到了這個方法,雖然秒殺A商品的人很多,但是秒殺B商品的人很少,但是即使是買B商品,進入到了這個方法,也會一樣的慢。

最重要的是,synchronized是jvm進程鎖,它只適合單點的情況。如果以后程序水平擴展了,弄了個集群,很顯然,負載均衡之后,不同的用戶看到的結果一定是五花八門的。

所以,還是使用更好的辦法,使用redis分布式鎖。

二、redis分布式鎖

1、兩個redis的命令

setnx key value 簡單來說,setnx就是,如果沒有這個key,那么就set一個key-value, 但是如果這個key已經存在,那么將不會再次設置,get出來的value還是最開始set進去的那個value.
網站中還專門講到可以使用!SETNX加鎖,如果獲得鎖,返回1,如果返回0,那么該鍵已經被其他的客戶端鎖定。
並且也提到了如何處理死鎖。

getset key value 這個就更簡單了,先通過key獲取value,然后再將新的value set進去。

2、redis分布式鎖的實現

我們希望的,無非就是這一段代碼,能夠單線程的去訪問,因此在這段代碼之前給他加鎖,相應的,這段代碼后面要給它解鎖:

 

 

2.1 引入redis依賴

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

  

2.2 配置redis

spring:
  redis:
    host: localhost
    port: 6379

  

package app;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.annotation.Bean;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.client.RestTemplate;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@SpringBootApplication
@EnableScheduling
public class Application {

   public static void main(String[] args) {
      SpringApplication.run(Application.class, args);
   }

   @Bean
   public TaskScheduler taskScheduler() {
      ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
      taskScheduler.setPoolSize(500);
      return taskScheduler;
   }
   @Bean
   public RestTemplate restTemplate(ClientHttpRequestFactory factory){
       return new RestTemplate(factory);
   }
    
   @Bean
   public ClientHttpRequestFactory simpleClientHttpRequestFactory(){
       SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
       factory.setReadTimeout(180000);//單位為ms1
       factory.setConnectTimeout(5000);//單位為ms
       return factory;
   }


}

  

2.3 編寫加鎖和解鎖的方法
package app.configuration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

    /**
    *feng
     */
@Component
public class RedisLock {

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 加鎖
     * @param key   id
     * @param value 當前時間+超時時間
     * @return
     */
    public boolean lock(String key, String value) {
        if (redisTemplate.opsForValue().setIfAbsent(key, value)) {     //這個其實就是setnx命令,只不過在java這邊稍有變化,返回的是boolea
            return true;
        }

        //避免死鎖,且只讓一個線程拿到鎖
        String currentValue = redisTemplate.opsForValue().get(key);
        //如果鎖過期了
        if (!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()) {
            //獲取上一個鎖的時間
            String oldValues = redisTemplate.opsForValue().getAndSet(key, value);

            /*
               只會讓一個線程拿到鎖
               如果舊的value和currentValue相等,只會有一個線程達成條件,因為第二個線程拿到的oldValue已經和currentValue不一樣了
             */
            if (!StringUtils.isEmpty(oldValues) && oldValues.equals(currentValue)) {
                return true;
            }
        }
        return false;
    }


    /**
     * 解鎖
     * @param key
     * @param value
     */
    public void unlock(String key, String value) {
        try {
            String currentValue = redisTemplate.opsForValue().get(key);
            if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
                redisTemplate.opsForValue().getOperations().delete(key);
            }
        } catch (Exception e) {
            logger.error("『redis分布式鎖』解鎖異常,{}", e);
        }
    }
}    

  為什么要有避免死鎖的一步呢?
假設沒有『避免死鎖』這一步,結果在執行到下單代碼的時候出了問題,畢竟操作數據庫、網絡、io的時候拋了個異常,這個異常是偶然拋出來的,就那么偶爾一次,那么會導致解鎖步驟不去執行,這時候就沒有解鎖,后面的請求進來自然也或得不到鎖,這就被稱之為死鎖。
而這里的『避免死鎖』,就是給鎖加了一個過期時間,如果鎖超時了,就返回true,解開之前的那個死鎖。

2.4 下單代碼中引入加鎖和解鎖,確保只有一個線程操作

        @Scheduled(cron = "0 0 0/1 * * ?")
        public void findUserCenterSyncTrialMeg(){
            log.info("定時任務啟動");
            //加鎖
            long time = System.currentTimeMillis() + 1000*10;  //超時時間:10秒,最好設為常量
            boolean isLock = redisLock.lock(Const.SynchronousProbationaryEmployee.SYNCHRONOUS_PROBATIONARY_EMPLOYEE,
                    String.valueOf(time));
            try {
                if(isLock){
                    Thread.sleep ( 12000 );
                    syncUserCenterController.findUserCenterSyncTrial();

                    syncUserCenterController.findUserCenterSyncTrialOutsourcing();
                    log.info ( "執行成功---------------" );
                }else{
                    log.info ( "人太多了,換個姿勢再試試~---" );
                }
            } catch (InterruptedException e) {
                log.error("Redisson 分布式鎖獲取異常 ");
            } catch (Exception e) {
                log.error("程序獲取異常 ");
            } finally {
                if (!isLock) {
                    return;
                }
                //解鎖
                redisLock.unlock(Const.SynchronousProbationaryEmployee.SYNCHRONOUS_PROBATIONARY_EMPLOYEE, String.valueOf(time));
                log.error("Redisson 分布式鎖 解鎖 ");
            }
        }

  輸出完成,成功成功成功!!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM