寫在前面
redis辣么多數據結構,這么多命令,具體一點,都可以應用在什么場景呢?用來解決什么具體的問題?
分布式鎖
redis是網絡單線程的,它只有一個線程負責接受請求,這個特性即降低了redis本身的開發成本,也提高了redis的可用性。
分布式環境下,數據一致性問題一直是一個比較重要的話題,分布式與單機情況下最大的不同在於其不是多線程而是多進程。
多線程由於可以共享堆內存,因此可以簡單的采取內存作為標記存儲位置,例如cas,java的synchronize。而進程之間可能不在同一台物理機上,因此需要將標記存儲在一個所有進程都能看到的地方。
常見的場景,秒殺場景中的庫存超賣問題、多機定時任務的並發執行問題等。
庫存超賣問題
假如訂單服務部署了多個實例。
現在做一個商品秒殺活動,商品一共只有2個,同時購買的用戶則可能有幾千上萬。
理想狀態下第一個和第二個用戶能購買成功,其他用戶提示購買失敗,
實際可能出現的情況是,多個用戶都同時查到商品還沒賣完,第一個用戶買到,更新庫存之前,第二個用戶又下了訂單,導致出錯。
下面用java代碼做一個演示:
java實例都可以被正常運行在jdk1.8+,使用jedis連接redis實例
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* JedisPool連接
* @author taifeng zhang
* */
public class JedisPoolConnect {
public static JedisPool jedispool;
/**
* 連接並返回jedis實例
* */
public static Jedis connectJedis () {
if (jedispool == null) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMinIdle(1);
jedisPoolConfig.setMaxIdle(10);
jedisPoolConfig.setTestOnBorrow(true);
jedispool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379);
}
return jedispool.getResource();
}
}
import redis.clients.jedis.*;
import redis.clients.jedis.Jedis;
/**
* 一個簡單的超賣演示程序
* */
public class MarketWrong {
public static String GOODS_LEN_KEY = "jedis:market:demo";
private final Integer DECR_THREAD_LEN = 16;
public void superMarket () {
// 開線程去減庫存
int i = DECR_THREAD_LEN;
while (i > 0) {
new Thread(() -> {
boolean hasGoods = true;
while (hasGoods) { // 當庫存大於0的時候
int goodsLen = getGoodsLen();
if (goodsLen > 0) {
decrGoodsLen(); // 一般進來之后就直接減去庫存了
System.out.println("現在庫存為" + getGoodsLen());
try {
Thread.sleep(100); //模擬中間處理流程
} catch (Exception e) {
System.out.println("執行減庫存錯誤" + e.getMessage() + e.getLocalizedMessage() + e.getStackTrace());
} finally {
// 最后邏輯
}
} else {
System.out.println("======賣完啦=======");
hasGoods = false;
}
}
}).start();
i--;
}
}
public void setGoodsLen (Integer len) {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
jedis.set(GOODS_LEN_KEY, String.valueOf(len));
} finally {
jedis.close();
}
}
private Integer getGoodsLen () {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
String val = jedis.get(GOODS_LEN_KEY);
if (val != null) {
return Integer.parseInt(val);
}
} finally {
jedis.close();
}
return 0;
}
private void decrGoodsLen () {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
// 庫存減1
jedis.decr(GOODS_LEN_KEY);
} finally {
jedis.close();
}
}
}
用junit測試上面的代碼:
import org.junit.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class MarketWrongTests {
/**
* 測試超賣小程序
*/
@Test
public void superMarket () throws Exception {
MarketWrong marketWrong = new MarketWrong();
// 這次就賣500件吧
marketWrong.setGoodsLen(500);
marketWrong.superMarket();
Thread.sleep(60000); // 賣一分鍾
}
}
運行輸出,每次庫存都會變為負數,開了16個線程同時買東西:
// 省略了幾萬行 現在庫存為8 現在庫存為8 現在庫存為4 現在庫存為4 現在庫存為4 現在庫存為4 現在庫存為3 現在庫存為-5 現在庫存為-5 現在庫存為-5 現在庫存為-5 現在庫存為-5 現在庫存為-5 現在庫存為-5 現在庫存為-5 ======賣完啦======= ======賣完啦======= ======賣完啦=======
上面的代碼示例中,庫存數據是共享資源(存到redis了,相當於數據庫),面對高並發情形,需要保證對資源的訪問次序。在單機環境Java提供基於內存的鎖來處理並發問題,但是這些API在分布式場景中就無能為力了。也就是說單純的內存鎖並不能提供這種多機器並發服務的能力。分布式系統中,由於分布式系統的分布性,即多線程和多進程並且分布在不同機器中,synchronized和lock這兩種鎖將失去原有鎖的效果,需要我們自己實現分布式鎖。
也就是說庫存的遞減必須是順序的
常見的鎖方案如下:
基於數據庫實現分布式鎖 基於緩存,實現分布式鎖,如redis 基於Zookeeper實現分布式鎖
下面實現一個redis的鎖,剖析一把redis是如何實現分布式鎖的:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.params.SetParams;
import java.util.ArrayList;
import java.util.Arrays;
/**
* redis鎖實現
* @author taifeng zhang
* */
public class RedisLock {
private static String REDIS_LOCK_KEY = "redis:lock:key";
/**
*設置lockkey
* */
public static void setRedisLockKey(String redisLockKey) {
REDIS_LOCK_KEY = redisLockKey;
}
/**
* 嘗試獲取鎖
* @param ov 可以指定一個鎖標識,鎖的唯一值,區分每個鎖的所有者身份
* @param timeout 獲取鎖的超時時間
* */
public boolean tryLock (String ov, int timeout) {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
// set nx ex
SetParams setParams = new SetParams();
setParams.nx();
setParams.ex(timeout);
Object val = jedis.set(REDIS_LOCK_KEY, ov, setParams); // set [key] nx ex [timeout]
return val != null;
} finally {
jedis.close();
}
}
/**
* 使用lua腳本釋放鎖
* @param ov 釋放之前先確定解鎖人的身份,所以要用到lua的原子特性
* */
public boolean tryUnlock (String ov) {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end";
String sha1 = jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL);
String[] keys = {REDIS_LOCK_KEY};
String[] args = {ov};
Integer val = Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString());
return val > 0;
} finally {
jedis.close();
}
}
}
實現原則有幾點: 1、原子相關操作步驟必須全部包括在鎖內 2、每個鎖都有一個唯一的value,標識加鎖人的身份。 3、加超時時間防止死鎖 (超時時間要合理)
- 加鎖代碼解析
/**
* 嘗試獲取鎖
* @param ov 可以指定一個鎖標識,鎖的唯一值,區分每個鎖的所有者身份
* @param timeout 獲取鎖的超時時間
* */
public boolean tryLock (String ov, int timeout) {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
// set nx ex
SetParams setParams = new SetParams();
setParams.nx();
setParams.ex(timeout);
Object val = jedis.set(REDIS_LOCK_KEY, ov, setParams); // 用 set [key] nx ex [timeout] 命令模擬加鎖
return val != null;
} finally {
jedis.close();
}
}
加鎖的代碼很簡單,其實就是利用redis命令 set [key] nx ex [timeout] 的特性,已有值的時候返回值為nil,如果執行這個命令的結果是null,那就可以認為資源已經被上鎖
同時,set也將REDIS_LOCK_KEY設置為一個唯一值,在解鎖的時候或者鎖重入的時候判斷身份使用。
- 解鎖代碼解析
/**
* 使用lua腳本釋放鎖
* @param ov 釋放之前先確定解鎖人的身份,所以要用到lua的原子特性
* */
public boolean tryUnlock (String ov) {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end";
String sha1 = jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL);
String[] keys = {REDIS_LOCK_KEY};
String[] args = {ov};
Integer val = Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString());
return val > 0;
} finally {
jedis.close();
}
}
解鎖代碼的精髓是這句lua腳本:
if (redis.call('get', KEYS[1]) == ARGV[1]) then
return redis.call('del', KEYS[1])
else return 0
從redis讀取key的值,如果它等於傳入的唯一key,則可以釋放鎖,否則返回0
為什么要檢查唯一key再釋放鎖呢?主要是為了這么一個場景:
- A用戶來獲取了鎖
- B用戶來獲取鎖,鎖已經被a拿走了,等待鎖
- A用戶可能因為突然發生網絡延遲,超過了超時時間,這時候鎖因為超時自動釋放了。
- B用戶獲取了鎖
- A用戶這時候網絡恢復了。。。這時候A用戶要釋放鎖,如果釋放成功就會導致連鎖反應,b用戶被解鎖,b又可能去解鎖c
- 所以每次加鎖解鎖都需要驗證獲取鎖的用戶身份,一般存放在key的value里面,在釋放鎖之前先檢查,也就是 check and set
鎖的重入
上面談到,我們記錄了每個鎖的用戶身份,那是不是同一個用戶一次操作需要兩次鎖,是可以重用的呢?
答案是ok的
我們可以在trylock中加一個lua腳本用來先check 再 set,如果判斷check與用戶符合,則直接返回true就可以了。
public boolean tryLock (String ov, int timeout) {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
// 加上鎖的重入特性
String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return 1 else return 0 end"; // 如果當前鎖的值等於ov的話,認為來獲取鎖的還是同一個人
String sha1 = jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL);
String[] keys = {REDIS_LOCK_KEY};
String[] args = {ov};
Integer val = Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString());
if (val > 0) { // 判定成功后,鎖就重入了,即無需第二次獲取鎖
return true;
}
// set nx ex
SetParams setParams = new SetParams();
setParams.nx();
setParams.ex(timeout);
Object val = jedis.set(REDIS_LOCK_KEY, ov, setParams); // set [key] nx ex [timeout]
return val != null;
} finally {
jedis.close();
}
}
最后我們看看關於超賣問題,我們將代碼加上鎖 注意兩個todo的地方。
import redis.clients.jedis.*;
import redis.clients.jedis.Jedis;
public class MarketWrong {
public static String GOODS_LEN_KEY = "jedis:market:demo";
private final Integer DECR_THREAD_LEN = 16;
RedisLock redisLock = new RedisLock();
public void superMarket () {
// 開線程去減庫存
int i = DECR_THREAD_LEN;
while (i > 0) {
int whilekey = i;
new Thread(() -> {
int n;
int j = 0;
boolean hasGoods = true;
while (hasGoods) { // 當庫存大於0的時候
String ov = whilekey + "-" + j;
// todo 加鎖
while (!redisLock.tryLock(ov, 20)) { // 如果獲取不到鎖就等待
}
int goodsLen = getGoodsLen();
if (goodsLen > 0) {
decrGoodsLen(); // 一般進來之后就直接減去庫存了
System.out.println("現在庫存為" + getGoodsLen());
redisLock.tryUnlock(ov); // todo 解除鎖
try {
Thread.sleep(100); //模擬中間處理流程
} catch (Exception e) {
System.out.println("執行減庫存錯誤" + e.getMessage() + e.getLocalizedMessage() + e.getStackTrace());
} finally {
// 最后邏輯
}
} else {
System.out.println("======賣完啦=======");
hasGoods = false;
}
j++; // 需要這個用來生成ov,相當於模擬每一個買家的id
}
}).start();
i--;
}
}
/**
* 一個簡單的超賣演示程序
* */
public void setGoodsLen (Integer len) {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
jedis.set(GOODS_LEN_KEY, String.valueOf(len));
} finally {
jedis.close();
}
}
private Integer getGoodsLen () {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
String val = jedis.get(GOODS_LEN_KEY);
if (val != null) {
return Integer.parseInt(val);
}
} finally {
jedis.close();
}
return 0;
}
private void decrGoodsLen () {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
// 庫存減1
jedis.decr(GOODS_LEN_KEY);
} finally {
jedis.close();
}
}
}
加上鎖之后再測試,超賣問題已解決,注意現在的輸出是線性遞增的,因為開線程的模擬方式就是並發處理,每次16個線程幾乎是同時進行的,所以在沒有鎖的時候,並發讀取的goodslen很有可能都是16個線程一樣的。
所以redis的這個鎖的實現也叫: 分布式互斥鎖
現在庫存為8 現在庫存為7 現在庫存為6 現在庫存為5 現在庫存為4 現在庫存為3 現在庫存為2 現在庫存為1 現在庫存為0 ======賣完啦======= ======賣完啦======= ======賣完啦=======
redis實現的分布式互斥鎖並不完美,但在大多數應用場景下夠用了,另外還可以使用zookeeper甚至mysql來實現。
分布式定時任務問題
分布式場景下,還有另外一個問題--定時任務並發問題,當我們的應用采用分布式部署的時候,就必然會有各種定時任務被部署到不同的機器實例上,如果兩台機器同時運行同一個定時任務的話,任務就執行了兩次。
這個問題可能更復雜一點,僅僅是加一個鎖有可能會壞事兒,因為定時任務的多機分布會產生幾個需要解決的問題:
-
多台機器的時間一致性問題
如果多台機器的時區不一致,那鎖基本上無從談起了。 或者時區一致,但可能服務器時間相差幾秒鍾,那么也有可能導致鎖丟失。
-
鎖未釋放問題(服務器宕機怎么辦)
那么如果serverA在加鎖的過程中,出現宕機怎么辦,是否會一直處於加鎖狀態
-
命名空間問題
每個定時任務應該有不同的鎖命名,防止出現同名鎖。
還是讓我們看一個java代碼的例子 注意,redis連接和鎖代碼有復用上面一節的
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
@Component
@EnableScheduling
public class ScheduleDemo {
private String sourceKey = "redis:schedule:test:key";
private void sendEmail (String serviceKey) throws InterruptedException {
Jedis jedis = JedisPoolConnect.connectJedis();
try {
Integer sendPatch = 0; // 從redis讀取來模擬發送的批次
Object val = jedis.get(sourceKey);
if (val != null) {
sendPatch = Integer.parseInt(val.toString());
}
Thread.sleep(2000);
System.out.println("批次[" + sendPatch +"]====發送郵件====" + serviceKey);
jedis.incr(sourceKey); // 批次加1
} finally {
jedis.close();
}
}
// 模擬service
@Scheduled(cron = "0 27 09 * * ?") // 【cron改為后面的時間】
public void serviceA () throws InterruptedException {
this.sendEmail("service");
}
}
將這段代碼打開兩個實例運行【ps,你可以在idea中右上角直接配兩個config就可以了】
看運行結果:
郵件1被同時發送了兩次,這是不可接受的。
ok,有的同學現在就想到了,加個鎖就完事了
我們將發送代碼加上一個鎖解決這個問題:在sendmail里加一個redis分布式鎖
private void sendEmail (String serviceKey) throws InterruptedException {
if (!redisLock.tryLock(serviceKey, 30)) {
return; // todo 獲取不到鎖就取消,同一個定時任務只需要執行一次
}
Jedis jedis = JedisPoolConnect.connectJedis();
try {
Integer sendPatch = 0; // 從redis讀取來模擬發送的批次
Object val = jedis.get(sourceKey);
if (val != null) {
sendPatch = Integer.parseInt(val.toString());
}
Thread.sleep(2000);
System.out.println("批次[" + sendPatch +"]====發送郵件====" + serviceKey);
jedis.incr(sourceKey); // 批次加1
redisLock.tryUnlock(serviceKey); // todo 解鎖
} finally {
jedis.close();
}
}
如果獲取不到鎖,那么取消這個任務的執行,看起來很完美對不對?
實際上沒有解決的問題還有很多。
- 多個定時任務的多個並發執行sendmail,key如何保證唯一?
可以使用實例的ip+端口做唯一key,這樣能夠保證多個實例的唯一性
- 兩台服務器時間差超過30s怎么辦?
通過中間媒介來確定時間。或者在服務器中杜絕這個問題
- 最重要的問題還是在於,兩台服務器的時間有可能有細微差別,他們本身就有可能不是並發的
這一點在分布式定時任務領域里很重要。
僅僅是加了一個同步鎖是遠遠不夠的
解決方案可以是根據業務的不同來設置不同的鎖超時時間,例如某個業務定時任務,每天只可以執行一次,那么將超時時間設置為1個小時最保險,如果某個定時任務每分鍾執行,執行操作時間大約20s,那你可以將超時時間設置成30s。
另一個解決方案是設置一個統一的、中心級別的定時任務,任務負責派發消息,通過消息隊列的方式來做定時,這里就不細表,這種方式比較適合異構、或者跨網絡、跨機房級別的分布式。
可以對redis鎖做一次小小的改版升級,使用aop加注解來完成鎖的配置:
我們定義一個方法級別的aop注解
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* redis lock
* @author taifeng zhang
* */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedisLockAop {
String key();
/**
* 兩種類型可選
* wait = 等待鎖
* return = 取消執行
* */
String type() default "wait";
int timeout() default 30;
}
然后通過aop,去為加了注解的方法做鎖操作
import com.halfway.halfway.redis.RedisLock;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
/**
* redislock aop實現
* @author by taifeng zhang
* */
@Component
@Aspect
public class RedisLockAopAspect {
private RedisLock redisLock = new RedisLock();
@Around("@within(com.halfway.halfway.redis.lockAop.RedisLockAop) && @annotation(lock)")
public Object excuteAop (ProceedingJoinPoint pjp, RedisLockAop lock) throws Throwable {
if ("wait".equals(lock.type())) {
while (!redisLock.tryLock(lock.key(), lock.timeout())) {} // todo 等待鎖
} else if ("return".equals(lock.type())) {
if (!redisLock.tryLock(lock.key(), lock.timeout())) {
return null; // todo 取消執行
}
} else {
throw new NullPointerException("type只可以是wait或者return");
}
Object val = pjp.proceed();
redisLock.tryUnlock(lock.key());
return val;
}
}
這個方式的好處是鎖與代碼解耦,無需關注鎖的內部實現變化
@Scheduled(cron = "0/30 * * * * ?")
@RedisLockAop(key = "serviceIp:port", type="return", timeout=15)
public void serviceA () throws InterruptedException {
this.sendEmail("service");
}




