背景:有一服務提供者Leader,有多個消息訂閱者Workers。Leader是一個排隊程序,維護了一個用戶隊列,當某個資源空閑下來並被分配至隊列中的用戶時,Leader會向訂閱者推送消息(消息帶有唯一標識ID),訂閱者在接收到消息后會進行特殊處理並再次推往前端。
問題:前端只需要接收到一條由Worker推送的消息即可,但是如果Workers不做消息重復推送判斷的話,會導致前端收到多條消息推送,從而影響正常業務邏輯。
方案一(未通過)
在Worker接收到消息時,嘗試先從redis緩存中根據消息的ID獲取值,有以下兩種情況:
- 如果值不存在,則表示當前這條消息是第一次被推送,可以執行繼續執行推送程序,當然,不要忘了將當前消息ID作為鍵插入緩存中,並設置一個過期時間,標記這條消息已經被推送過了。
- 如果值存在,則表示當前這條消息是被推送過的,跳過推送程序。
代碼可以這么寫:
public void waitingForMsg() {
// Message Received.
String value = redisTemplate.opsForValue().get("msg_pushed_" + msgId);
if (!StringUtils.hasText(value)) {
// 當不能從緩存中讀取到數據時,表示消息是第一次被推送
// 趕緊往緩存中插入一個標識,表示當前消息已經被推送過了
redisTemplate.opsForValue().set("msg_pushed_" + msgId, "1");
// 再設置一個過期時間,防止數據無限制保留
redisTemplate.expire("msg_pushed_" + msgId, 20, TimeUnit.SECONDS);
// 接下來就可以執行推送操作啦
this.pushMsgToFrontEnd();
}
}
看起來似乎是沒啥問題,但是我們從redis的角度分析一下請求,看看是不是真的沒問題。
> get msg_pushed_1 # 此時Worker1嘗試獲取值
> get msg_pushed_1 # Worker2也沒閑着,執行了這句話,並且時間找得剛剛好,就在Worker1准備插入值之前
> set msg_pushed_1 "1" # Worker1覺得消息沒有被推送,插入了一個值
> set msg_pushed_1 "1" # Worker2也這么覺得,做了同樣的一件事
你看,還是有可能會往前端推送多次消息,所以這個方案不通過。
再仔細想一想,出現這個問題的原因是啥?———— 就是在執行get和set命令時,沒有保持原子性操作,導致其他命令有機可趁,那是不是可以把get和set命令當成一整個部分執行,不讓其他命令插入執行呢?
有很多方案可以實現,例如給鍵加鎖或者添加事務可能可以完成這個操作。但是我們今天討論一下另外一種方案,在Redis中執行Lua腳本。
方案二
我們可以看一下Redis官方文檔對Lua腳本原子性的解釋。
Atomicity of scripts
Redis uses the same Lua interpreter to run all the commands. Also Redis guarantees that a script is executed in an atomic way: no other script or Redis command will be executed while a script is being executed. This semantic is similar to the one of MULTI / EXEC. From the point of view of all the other clients the effects of a script are either still not visible or already completed.
大致意思是說:我們Redis采用相同的Lua解釋器去運行所有命令,我們可以保證,腳本的執行是原子性的。作用就類似於加了MULTI/EXEC。
好,原子性有保證了,那么我們再看看編寫語法。
> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
1) "key1"
2) "key2"
3) "first"
4) "second"
由前至后的命令解釋(Arg 表示參數的意思 argument):
eval: Redis執行Lua腳本的命令,后接腳本內容及各參數。這個命令是從2.6.0版本才開始支持的。
1st. Arg : Lua腳本,其中的KEYS[]和ARGV[]是傳入script的參數 。
2nd. Arg: 后面跟着的KEY個數n,從第三個參數開始的總共n個參數會被作為KEYS傳入script中,在script中可以通過KEYS[1], KEYS[2]…格式讀取,下標從1開始 。
Remain Arg: 剩余的參數可以在腳本中通過ARGV[1], ARGV[2]…格式讀取 ,下標從1開始 。
我們執行腳本內容是return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}表示返回傳入的參數,所以我們可以看到參數被原封不動的返回了。
接着,我們再來實戰一下,在Lua腳本中調用Redis方法吧。
我們可以在Lua腳本中通過以下兩個命令調用redis的命令程序
- redis.call()
- redis.pcall()
兩者的作用是一樣的,但是程序出錯時的返回結果略有不同。

使用方法,命令和在Redis中執行一模一樣:
> eval "return redis.call('set', KEYS[1], ARGV[1])" 1 foo bar
OK
> eval "return redis.call('get', KEYS[1])" 1 foo
"bar"
是不是很簡單,說了這么多,我們趕緊來現學現賣,寫一個腳本應用在我們的場景中吧。
> eval "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end" 1 msg_push_1 "1" 10
腳本的意思和我們之前在方案一中寫的程序邏輯一樣,先判斷緩存中是否存在鍵,如果不存在則存入鍵和其值,並且設置失效時間,最后返回0;如果存在則返回1。PS: 如果對if redis.call('get', KEYS[1]) == false這里為什么得到的結果要與false比較有疑問的話,可以看最后的Tip。
-
執行第一次:我們發現返回值0,並且我們看到緩存中插入了一條數據,鍵為
msg_push_1、值為"1" -
在失效前,執行多次:我們發現返回值一直為1。並且在執行第一次后的10秒,該鍵被自動刪除。
將以上邏輯遷入我們java代碼后,就是下面這個樣子啦
public boolean isMessagePushed(String messageId) {
Assert.hasText(messageId, "消息ID不能為空");
// 使用lua腳本檢測值是否存在
String script = "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end";
// 這里使用Long類型,查看源碼可知腳本返回值類型只支持Long, Boolean, List, or deserialized value type.
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
// 設置key
List<String> keyList = new ArrayList<>();
// key為消息ID
keyList.add(messageId);
// 每個鍵的失效時間為20秒
Long result = redisTemplate.execute(redisScript, keyList, 1, 20);
// 返回true: 已讀、false: 未讀
return result != null && result != 0L;
}
public void waitingForMsg() {
// Message Received.
if (!this.isMessagePushed(msgId)) {
// 返回false表示未讀,接下來就可以執行推送操作啦
this.pushMsgToFrontEnd();
}
}
Tip
這里只是簡單的Redis中使用Lua腳本介紹,詳細的使用方法可以參考官方文檔,而且還有其他很多用法介紹。
對了,上面還有一個坑需要注意一下,就是關於Redis和Lua中變量的相互轉換,因為說起來啰哩啰嗦的,所以沒放在上文中,最后可以簡單說一下。
Redis to Lua conversion table.
- Redis integer reply -> Lua number
- Redis bulk reply -> Lua string
- Redis multi bulk reply -> Lua table (may have other Redis data types nested)
- Redis status reply -> Lua table with a single ok field containing the status
- Redis error reply -> Lua table with a single err field containing the error
- Redis Nil bulk reply and Nil multi bulk reply -> Lua false boolean type // 這里就是上面我們在腳本中做是否為空判斷的時候
if redis.call('get', KEYS[1]) == false,采用與false比較的原因。Redis的nil(類似null)會被轉換為Lua的falseLua to Redis conversion table.
- Lua number -> Redis integer reply (the number is converted into an integer)
- Lua string -> Redis bulk reply
- Lua table (array) -> Redis multi bulk reply (truncated to the first nil inside the Lua array if any)
- Lua table with a single ok field -> Redis status reply
- Lua table with a single err field -> Redis error reply
- Lua boolean false -> Redis Nil bulk reply.
注意點:
Lua的Number類型會被轉為Redis的Integer類型,因此如果希望得到小數時,需要由Lua返回String類型的數字。
