一、使用Apache ab模擬並發壓測
1、壓測工具介紹
$ ab -n 100 -c 100 http://www.baidu.com/
-n
表示發出100個請求,-c
模擬100個並發,相當是100個人同時訪問。
還可以這樣寫:
$ ab -t 60 -c 100 http://www.baidu.com/
-t
表示60秒,-c
是100個並發,會在連續60秒內不停的發出請求。
使用ab工具模擬多線程並發請求,對發出負載的機器要求比較低,既不會占用很多cpu,也不會占用很多的內存,因此也是很多DDoS攻擊的必備良葯,不過要慎用,別耗光自己機器的資源。通常來說1000個請求,100個並發算是比較正常的模擬。
至於工具的使用,具體見:Apache ab 測試工具使用(一)
下載后,進入support
文件夾,執行命令。
2、並發測試
我創建了兩張表,一個商品表,一個訂單記錄表;
然后寫了兩個接口,一個是查詢商品信息,一個是下單秒殺。
查詢訂單:
秒殺下單:
當我並發測試時:
$ ab -n 500 -c 100 http://localhost:8080/seckill/1/
這TM肯定不行啊,這就超賣了,明明沒這么多商品,結果還賣出去了。。。
二、synchronized處理並發
首先,synchronized
的確是一個解決辦法,而且也很簡單,在方法前面加一個synchronized
關鍵字。
但是通過壓測,發現請求變的很慢,因為:
synchronized
就用一個鎖把這個方法鎖住了,每次訪問這個方法,只會有一個線程,所以這就是它導致慢的原因。通過這種方式,保證這個方法中的代碼都是單線程來處理,不會出什么問題。
同時,使用synchronized
還是存在一些問題的,首先,它無法做到細粒度的控制,比如同一時間有秒殺A商品和B商品的請求,都進入到了這個方法,雖然秒殺A商品的人很多,但是秒殺B商品的人很少,但是即使是買B商品,進入到了這個方法,也會一樣的慢。
最重要的是,它只適合單點的情況。如果以后程序水平擴展了,弄了個集群,很顯然,負載均衡之后,不同的用戶看到的結果一定是五花八門的。
所以,還是使用更好的辦法,使用redis分布式鎖。
三、redis分布式鎖
1、兩個redis的命令
setnx key value
簡單來說,setnx
就是,如果沒有這個key
,那么就set一個key-value, 但是如果這個key
已經存在,那么將不會再次設置,get出來的value還是最開始set進去的那個value.
網站中還專門講到可以使用!SETNX
加鎖,如果獲得鎖,返回1,如果返回0,那么該鍵已經被其他的客戶端鎖定。
並且也提到了如何處理死鎖。
getset key value
這個就更簡單了,先通過key獲取value,然后再將新的value set進去。
2、redis分布式鎖的實現
我們希望的,無非就是這一段代碼,能夠單線程的去訪問,因此在這段代碼之前給他加鎖,相應的,這段代碼后面要給它解鎖:
2.1 引入redis依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.2 配置redis
spring:
redis:
host: localhost
port: 6379
2.3 編寫加鎖和解鎖的方法
package com.vito.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
/**
* Created by VitoYi on 2018/4/5.
*/
@Component
public class RedisLock {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 加鎖
* @param key 商品id
* @param value 當前時間+超時時間
* @return
*/
public boolean lock(String key, String value) {
if (redisTemplate.opsForValue().setIfAbsent(key, value)) { //這個其實就是setnx命令,只不過在java這邊稍有變化,返回的是boolea
return true;
}
//避免死鎖,且只讓一個線程拿到鎖
String currentValue = redisTemplate.opsForValue().get(key);
//如果鎖過期了
if (!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()) {
//獲取上一個鎖的時間
String oldValues = redisTemplate.opsForValue().getAndSet(key, value);
/*
只會讓一個線程拿到鎖
如果舊的value和currentValue相等,只會有一個線程達成條件,因為第二個線程拿到的oldValue已經和currentValue不一樣了
*/
if (!StringUtils.isEmpty(oldValues) && oldValues.equals(currentValue)) {
return true;
}
}
return false;
}
/**
* 解鎖
* @param key
* @param value
*/
public void unlock(String key, String value) {
try {
String currentValue = redisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
redisTemplate.opsForValue().getOperations().delete(key);
}
} catch (Exception e) {
logger.error("『redis分布式鎖』解鎖異常,{}", e);
}
}
}
為什么要有避免死鎖的一步呢?
假設沒有『避免死鎖』這一步,結果在執行到下單代碼的時候出了問題,畢竟操作數據庫、網絡、io的時候拋了個異常,這個異常是偶然拋出來的,就那么偶爾一次,那么會導致解鎖步驟不去執行,這時候就沒有解鎖,后面的請求進來自然也或得不到鎖,這就被稱之為死鎖。
而這里的『避免死鎖』,就是給鎖加了一個過期時間,如果鎖超時了,就返回true
,解開之前的那個死鎖。
2.4 下單代碼中引入加鎖和解鎖,確保只有一個線程操作
@Autowired
private RedisLock redisLock;
@Override
@Transactional
public String seckill(Integer id)throws RuntimeException {
//加鎖
long time = System.currentTimeMillis() + 1000*10; //超時時間:10秒,最好設為常量
boolean isLock = redisLock.lock(String.valueOf(id), String.valueOf(time));
if(!isLock){
throw new RuntimeException("人太多了,換個姿勢再試試~");
}
//查庫存
Product product = productMapper.findById(id);
if(product.getStock()==0) throw new RuntimeException("已經賣光");
//寫入訂單表
Order order=new Order();
order.setProductId(product.getId());
order.setProductName(product.getName());
orderMapper.add(order);
//減庫存
product.setPrice(null);
product.setName(null);
product.setStock(product.getStock()-1);
productMapper.update(product);
//解鎖
redisLock.unlock(String.valueOf(id),String.valueOf(time));
return findProductInfo(id);
}
這樣再來跑幾次壓測,就不會超賣了: