寫在前面
redis辣么多數據結構,這么多命令,具體一點,都可以應用在什么場景呢?用來解決什么具體的問題?
分布式鎖
redis是網絡單線程的,它只有一個線程負責接受請求,這個特性即降低了redis本身的開發成本,也提高了redis的可用性。
分布式環境下,數據一致性問題一直是一個比較重要的話題,分布式與單機情況下最大的不同在於其不是多線程而是多進程。
多線程由於可以共享堆內存,因此可以簡單的采取內存作為標記存儲位置,例如cas,java的synchronize。而進程之間可能不在同一台物理機上,因此需要將標記存儲在一個所有進程都能看到的地方。
常見的場景,秒殺場景中的庫存超賣問題、多機定時任務的並發執行問題等。
庫存超賣問題
假如訂單服務部署了多個實例。
現在做一個商品秒殺活動,商品一共只有2個,同時購買的用戶則可能有幾千上萬。
理想狀態下第一個和第二個用戶能購買成功,其他用戶提示購買失敗,
實際可能出現的情況是,多個用戶都同時查到商品還沒賣完,第一個用戶買到,更新庫存之前,第二個用戶又下了訂單,導致出錯。
下面用java代碼做一個演示:
java實例都可以被正常運行在jdk1.8+,使用jedis連接redis實例
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** * JedisPool連接 * @author taifeng zhang * */ public class JedisPoolConnect { public static JedisPool jedispool; /** * 連接並返回jedis實例 * */ public static Jedis connectJedis () { if (jedispool == null) { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMinIdle(1); jedisPoolConfig.setMaxIdle(10); jedisPoolConfig.setTestOnBorrow(true); jedispool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379); } return jedispool.getResource(); } } import redis.clients.jedis.*; import redis.clients.jedis.Jedis; /** * 一個簡單的超賣演示程序 * */ public class MarketWrong { public static String GOODS_LEN_KEY = "jedis:market:demo"; private final Integer DECR_THREAD_LEN = 16; public void superMarket () { // 開線程去減庫存 int i = DECR_THREAD_LEN; while (i > 0) { new Thread(() -> { boolean hasGoods = true; while (hasGoods) { // 當庫存大於0的時候 int goodsLen = getGoodsLen(); if (goodsLen > 0) { decrGoodsLen(); // 一般進來之后就直接減去庫存了 System.out.println("現在庫存為" + getGoodsLen()); try { Thread.sleep(100); //模擬中間處理流程 } catch (Exception e) { System.out.println("執行減庫存錯誤" + e.getMessage() + e.getLocalizedMessage() + e.getStackTrace()); } finally { // 最后邏輯 } } else { System.out.println("======賣完啦======="); hasGoods = false; } } }).start(); i--; } } public void setGoodsLen (Integer len) { Jedis jedis = JedisPoolConnect.connectJedis(); try { jedis.set(GOODS_LEN_KEY, String.valueOf(len)); } finally { jedis.close(); } } private Integer getGoodsLen () { Jedis jedis = JedisPoolConnect.connectJedis(); try { String val = jedis.get(GOODS_LEN_KEY); if (val != null) { return Integer.parseInt(val); } } finally { jedis.close(); } return 0; } private void decrGoodsLen () { Jedis jedis = JedisPoolConnect.connectJedis(); try { // 庫存減1 jedis.decr(GOODS_LEN_KEY); } finally { jedis.close(); } } }
用junit測試上面的代碼:
import org.junit.Test; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class MarketWrongTests { /** * 測試超賣小程序 */ @Test public void superMarket () throws Exception { MarketWrong marketWrong = new MarketWrong(); // 這次就賣500件吧 marketWrong.setGoodsLen(500); marketWrong.superMarket(); Thread.sleep(60000); // 賣一分鍾 } }
運行輸出,每次庫存都會變為負數,開了16個線程同時買東西:
// 省略了幾萬行 現在庫存為8 現在庫存為8 現在庫存為4 現在庫存為4 現在庫存為4 現在庫存為4 現在庫存為3 現在庫存為-5 現在庫存為-5 現在庫存為-5 現在庫存為-5 現在庫存為-5 現在庫存為-5 現在庫存為-5 現在庫存為-5 ======賣完啦======= ======賣完啦======= ======賣完啦=======
上面的代碼示例中,庫存數據是共享資源(存到redis了,相當於數據庫),面對高並發情形,需要保證對資源的訪問次序。在單機環境Java提供基於內存的鎖來處理並發問題,但是這些API在分布式場景中就無能為力了。也就是說單純的內存鎖並不能提供這種多機器並發服務的能力。分布式系統中,由於分布式系統的分布性,即多線程和多進程並且分布在不同機器中,synchronized和lock這兩種鎖將失去原有鎖的效果,需要我們自己實現分布式鎖。
也就是說庫存的遞減必須是順序的
常見的鎖方案如下:
基於數據庫實現分布式鎖 基於緩存,實現分布式鎖,如redis 基於Zookeeper實現分布式鎖
下面實現一個redis的鎖,剖析一把redis是如何實現分布式鎖的:
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.params.SetParams; import java.util.ArrayList; import java.util.Arrays; /** * redis鎖實現 * @author taifeng zhang * */ public class RedisLock { private static String REDIS_LOCK_KEY = "redis:lock:key"; /** *設置lockkey * */ public static void setRedisLockKey(String redisLockKey) { REDIS_LOCK_KEY = redisLockKey; } /** * 嘗試獲取鎖 * @param ov 可以指定一個鎖標識,鎖的唯一值,區分每個鎖的所有者身份 * @param timeout 獲取鎖的超時時間 * */ public boolean tryLock (String ov, int timeout) { Jedis jedis = JedisPoolConnect.connectJedis(); try { // set nx ex SetParams setParams = new SetParams(); setParams.nx(); setParams.ex(timeout); Object val = jedis.set(REDIS_LOCK_KEY, ov, setParams); // set [key] nx ex [timeout] return val != null; } finally { jedis.close(); } } /** * 使用lua腳本釋放鎖 * @param ov 釋放之前先確定解鎖人的身份,所以要用到lua的原子特性 * */ public boolean tryUnlock (String ov) { Jedis jedis = JedisPoolConnect.connectJedis(); try { String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end"; String sha1 = jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL); String[] keys = {REDIS_LOCK_KEY}; String[] args = {ov}; Integer val = Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString()); return val > 0; } finally { jedis.close(); } } }
實現原則有幾點: 1、原子相關操作步驟必須全部包括在鎖內 2、每個鎖都有一個唯一的value,標識加鎖人的身份。 3、加超時時間防止死鎖 (超時時間要合理)
- 加鎖代碼解析
/** * 嘗試獲取鎖 * @param ov 可以指定一個鎖標識,鎖的唯一值,區分每個鎖的所有者身份 * @param timeout 獲取鎖的超時時間 * */ public boolean tryLock (String ov, int timeout) { Jedis jedis = JedisPoolConnect.connectJedis(); try { // set nx ex SetParams setParams = new SetParams(); setParams.nx(); setParams.ex(timeout); Object val = jedis.set(REDIS_LOCK_KEY, ov, setParams); // 用 set [key] nx ex [timeout] 命令模擬加鎖 return val != null; } finally { jedis.close(); } }
加鎖的代碼很簡單,其實就是利用redis命令 set [key] nx ex [timeout] 的特性,已有值的時候返回值為nil,如果執行這個命令的結果是null,那就可以認為資源已經被上鎖
同時,set也將REDIS_LOCK_KEY設置為一個唯一值,在解鎖的時候或者鎖重入的時候判斷身份使用。
- 解鎖代碼解析
/** * 使用lua腳本釋放鎖 * @param ov 釋放之前先確定解鎖人的身份,所以要用到lua的原子特性 * */ public boolean tryUnlock (String ov) { Jedis jedis = JedisPoolConnect.connectJedis(); try { String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end"; String sha1 = jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL); String[] keys = {REDIS_LOCK_KEY}; String[] args = {ov}; Integer val = Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString()); return val > 0; } finally { jedis.close(); } }
解鎖代碼的精髓是這句lua腳本:
if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0
從redis讀取key的值,如果它等於傳入的唯一key,則可以釋放鎖,否則返回0
為什么要檢查唯一key再釋放鎖呢?主要是為了這么一個場景:
- A用戶來獲取了鎖
- B用戶來獲取鎖,鎖已經被a拿走了,等待鎖
- A用戶可能因為突然發生網絡延遲,超過了超時時間,這時候鎖因為超時自動釋放了。
- B用戶獲取了鎖
- A用戶這時候網絡恢復了。。。這時候A用戶要釋放鎖,如果釋放成功就會導致連鎖反應,b用戶被解鎖,b又可能去解鎖c
- 所以每次加鎖解鎖都需要驗證獲取鎖的用戶身份,一般存放在key的value里面,在釋放鎖之前先檢查,也就是 check and set
鎖的重入
上面談到,我們記錄了每個鎖的用戶身份,那是不是同一個用戶一次操作需要兩次鎖,是可以重用的呢?
答案是ok的
我們可以在trylock中加一個lua腳本用來先check 再 set,如果判斷check與用戶符合,則直接返回true就可以了。
public boolean tryLock (String ov, int timeout) { Jedis jedis = JedisPoolConnect.connectJedis(); try { // 加上鎖的重入特性 String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return 1 else return 0 end"; // 如果當前鎖的值等於ov的話,認為來獲取鎖的還是同一個人 String sha1 = jedis.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL); String[] keys = {REDIS_LOCK_KEY}; String[] args = {ov}; Integer val = Integer.parseInt(jedis.evalsha(sha1,new ArrayList<>(Arrays.asList(keys)),new ArrayList<>(Arrays.asList(args))).toString()); if (val > 0) { // 判定成功后,鎖就重入了,即無需第二次獲取鎖 return true; } // set nx ex SetParams setParams = new SetParams(); setParams.nx(); setParams.ex(timeout); Object val = jedis.set(REDIS_LOCK_KEY, ov, setParams); // set [key] nx ex [timeout] return val != null; } finally { jedis.close(); } }
最后我們看看關於超賣問題,我們將代碼加上鎖 注意兩個todo的地方。
import redis.clients.jedis.*; import redis.clients.jedis.Jedis; public class MarketWrong { public static String GOODS_LEN_KEY = "jedis:market:demo"; private final Integer DECR_THREAD_LEN = 16; RedisLock redisLock = new RedisLock(); public void superMarket () { // 開線程去減庫存 int i = DECR_THREAD_LEN; while (i > 0) { int whilekey = i; new Thread(() -> { int n; int j = 0; boolean hasGoods = true; while (hasGoods) { // 當庫存大於0的時候 String ov = whilekey + "-" + j; // todo 加鎖 while (!redisLock.tryLock(ov, 20)) { // 如果獲取不到鎖就等待 } int goodsLen = getGoodsLen(); if (goodsLen > 0) { decrGoodsLen(); // 一般進來之后就直接減去庫存了 System.out.println("現在庫存為" + getGoodsLen()); redisLock.tryUnlock(ov); // todo 解除鎖 try { Thread.sleep(100); //模擬中間處理流程 } catch (Exception e) { System.out.println("執行減庫存錯誤" + e.getMessage() + e.getLocalizedMessage() + e.getStackTrace()); } finally { // 最后邏輯 } } else { System.out.println("======賣完啦======="); hasGoods = false; } j++; // 需要這個用來生成ov,相當於模擬每一個買家的id } }).start(); i--; } } /** * 一個簡單的超賣演示程序 * */ public void setGoodsLen (Integer len) { Jedis jedis = JedisPoolConnect.connectJedis(); try { jedis.set(GOODS_LEN_KEY, String.valueOf(len)); } finally { jedis.close(); } } private Integer getGoodsLen () { Jedis jedis = JedisPoolConnect.connectJedis(); try { String val = jedis.get(GOODS_LEN_KEY); if (val != null) { return Integer.parseInt(val); } } finally { jedis.close(); } return 0; } private void decrGoodsLen () { Jedis jedis = JedisPoolConnect.connectJedis(); try { // 庫存減1 jedis.decr(GOODS_LEN_KEY); } finally { jedis.close(); } } }
加上鎖之后再測試,超賣問題已解決,注意現在的輸出是線性遞增的,因為開線程的模擬方式就是並發處理,每次16個線程幾乎是同時進行的,所以在沒有鎖的時候,並發讀取的goodslen很有可能都是16個線程一樣的。
所以redis的這個鎖的實現也叫: 分布式互斥鎖
現在庫存為8 現在庫存為7 現在庫存為6 現在庫存為5 現在庫存為4 現在庫存為3 現在庫存為2 現在庫存為1 現在庫存為0 ======賣完啦======= ======賣完啦======= ======賣完啦=======
redis實現的分布式互斥鎖並不完美,但在大多數應用場景下夠用了,另外還可以使用zookeeper甚至mysql來實現。
分布式定時任務問題
分布式場景下,還有另外一個問題--定時任務並發問題,當我們的應用采用分布式部署的時候,就必然會有各種定時任務被部署到不同的機器實例上,如果兩台機器同時運行同一個定時任務的話,任務就執行了兩次。
這個問題可能更復雜一點,僅僅是加一個鎖有可能會壞事兒,因為定時任務的多機分布會產生幾個需要解決的問題:
-
多台機器的時間一致性問題
如果多台機器的時區不一致,那鎖基本上無從談起了。 或者時區一致,但可能服務器時間相差幾秒鍾,那么也有可能導致鎖丟失。
-
鎖未釋放問題(服務器宕機怎么辦)
那么如果serverA在加鎖的過程中,出現宕機怎么辦,是否會一直處於加鎖狀態
-
命名空間問題
每個定時任務應該有不同的鎖命名,防止出現同名鎖。
還是讓我們看一個java代碼的例子 注意,redis連接和鎖代碼有復用上面一節的
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; @Component @EnableScheduling public class ScheduleDemo { private String sourceKey = "redis:schedule:test:key"; private void sendEmail (String serviceKey) throws InterruptedException { Jedis jedis = JedisPoolConnect.connectJedis(); try { Integer sendPatch = 0; // 從redis讀取來模擬發送的批次 Object val = jedis.get(sourceKey); if (val != null) { sendPatch = Integer.parseInt(val.toString()); } Thread.sleep(2000); System.out.println("批次[" + sendPatch +"]====發送郵件====" + serviceKey); jedis.incr(sourceKey); // 批次加1 } finally { jedis.close(); } } // 模擬service @Scheduled(cron = "0 27 09 * * ?") // 【cron改為后面的時間】 public void serviceA () throws InterruptedException { this.sendEmail("service"); } }
將這段代碼打開兩個實例運行【ps,你可以在idea中右上角直接配兩個config就可以了】
看運行結果:
郵件1被同時發送了兩次,這是不可接受的。
ok,有的同學現在就想到了,加個鎖就完事了
我們將發送代碼加上一個鎖解決這個問題:在sendmail里加一個redis分布式鎖
private void sendEmail (String serviceKey) throws InterruptedException { if (!redisLock.tryLock(serviceKey, 30)) { return; // todo 獲取不到鎖就取消,同一個定時任務只需要執行一次 } Jedis jedis = JedisPoolConnect.connectJedis(); try { Integer sendPatch = 0; // 從redis讀取來模擬發送的批次 Object val = jedis.get(sourceKey); if (val != null) { sendPatch = Integer.parseInt(val.toString()); } Thread.sleep(2000); System.out.println("批次[" + sendPatch +"]====發送郵件====" + serviceKey); jedis.incr(sourceKey); // 批次加1 redisLock.tryUnlock(serviceKey); // todo 解鎖 } finally { jedis.close(); } }
如果獲取不到鎖,那么取消這個任務的執行,看起來很完美對不對?
實際上沒有解決的問題還有很多。
- 多個定時任務的多個並發執行sendmail,key如何保證唯一?
可以使用實例的ip+端口做唯一key,這樣能夠保證多個實例的唯一性
- 兩台服務器時間差超過30s怎么辦?
通過中間媒介來確定時間。或者在服務器中杜絕這個問題
- 最重要的問題還是在於,兩台服務器的時間有可能有細微差別,他們本身就有可能不是並發的
這一點在分布式定時任務領域里很重要。
僅僅是加了一個同步鎖是遠遠不夠的
解決方案可以是根據業務的不同來設置不同的鎖超時時間,例如某個業務定時任務,每天只可以執行一次,那么將超時時間設置為1個小時最保險,如果某個定時任務每分鍾執行,執行操作時間大約20s,那你可以將超時時間設置成30s。
另一個解決方案是設置一個統一的、中心級別的定時任務,任務負責派發消息,通過消息隊列的方式來做定時,這里就不細表,這種方式比較適合異構、或者跨網絡、跨機房級別的分布式。
可以對redis鎖做一次小小的改版升級,使用aop加注解來完成鎖的配置:
我們定義一個方法級別的aop注解
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * redis lock * @author taifeng zhang * */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface RedisLockAop { String key(); /** * 兩種類型可選 * wait = 等待鎖 * return = 取消執行 * */ String type() default "wait"; int timeout() default 30; }
然后通過aop,去為加了注解的方法做鎖操作
import com.halfway.halfway.redis.RedisLock; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.After; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; /** * redislock aop實現 * @author by taifeng zhang * */ @Component @Aspect public class RedisLockAopAspect { private RedisLock redisLock = new RedisLock(); @Around("@within(com.halfway.halfway.redis.lockAop.RedisLockAop) && @annotation(lock)") public Object excuteAop (ProceedingJoinPoint pjp, RedisLockAop lock) throws Throwable { if ("wait".equals(lock.type())) { while (!redisLock.tryLock(lock.key(), lock.timeout())) {} // todo 等待鎖 } else if ("return".equals(lock.type())) { if (!redisLock.tryLock(lock.key(), lock.timeout())) { return null; // todo 取消執行 } } else { throw new NullPointerException("type只可以是wait或者return"); } Object val = pjp.proceed(); redisLock.tryUnlock(lock.key()); return val; } }
這個方式的好處是鎖與代碼解耦,無需關注鎖的內部實現變化
@Scheduled(cron = "0/30 * * * * ?") @RedisLockAop(key = "serviceIp:port", type="return", timeout=15) public void serviceA () throws InterruptedException { this.sendEmail("service"); }