新姿勢!Redis中調用Lua腳本以實現原子性操作


背景:有一服務提供者Leader,有多個消息訂閱者Workers。Leader是一個排隊程序,維護了一個用戶隊列,當某個資源空閑下來並被分配至隊列中的用戶時,Leader會向訂閱者推送消息(消息帶有唯一標識ID),訂閱者在接收到消息后會進行特殊處理並再次推往前端。

問題:前端只需要接收到一條由Worker推送的消息即可,但是如果Workers不做消息重復推送判斷的話,會導致前端收到多條消息推送,從而影響正常業務邏輯。

方案一(未通過)

在Worker接收到消息時,嘗試先從redis緩存中根據消息的ID獲取值,有以下兩種情況:

  • 如果值不存在,則表示當前這條消息是第一次被推送,可以執行繼續執行推送程序,當然,不要忘了將當前消息ID作為鍵插入緩存中,並設置一個過期時間,標記這條消息已經被推送過了。
  • 如果值存在,則表示當前這條消息是被推送過的,跳過推送程序。

代碼可以這么寫:

public void waitingForMsg() {
    // Message Received.
    String value = redisTemplate.opsForValue().get("msg_pushed_" + msgId);
    if (!StringUtils.hasText(value)) {
        // 當不能從緩存中讀取到數據時,表示消息是第一次被推送
        // 趕緊往緩存中插入一個標識,表示當前消息已經被推送過了
        redisTemplate.opsForValue().set("msg_pushed_" + msgId, "1");
        // 再設置一個過期時間,防止數據無限制保留
        redisTemplate.expire("msg_pushed_" + msgId, 20, TimeUnit.SECONDS);
        // 接下來就可以執行推送操作啦
        this.pushMsgToFrontEnd();
    }
}

看起來似乎是沒啥問題,但是我們從redis的角度分析一下請求,看看是不是真的沒問題。

> get msg_pushed_1		# 此時Worker1嘗試獲取值
> get msg_pushed_1		# Worker2也沒閑着,執行了這句話,並且時間找得剛剛好,就在Worker1准備插入值之前
> set msg_pushed_1 "1"  # Worker1覺得消息沒有被推送,插入了一個值
> set msg_pushed_1 "1"  # Worker2也這么覺得,做了同樣的一件事

你看,還是有可能會往前端推送多次消息,所以這個方案不通過。

再仔細想一想,出現這個問題的原因是啥?———— 就是在執行get和set命令時,沒有保持原子性操作,導致其他命令有機可趁,那是不是可以把get和set命令當成一整個部分執行,不讓其他命令插入執行呢?

有很多方案可以實現,例如給鍵加鎖或者添加事務可能可以完成這個操作。但是我們今天討論一下另外一種方案,在Redis中執行Lua腳本。


方案二

我們可以看一下Redis官方文檔對Lua腳本原子性的解釋

Atomicity of scripts

Redis uses the same Lua interpreter to run all the commands. Also Redis guarantees that a script is executed in an atomic way: no other script or Redis command will be executed while a script is being executed. This semantic is similar to the one of MULTI / EXEC. From the point of view of all the other clients the effects of a script are either still not visible or already completed.

大致意思是說:我們Redis采用相同的Lua解釋器去運行所有命令,我們可以保證,腳本的執行是原子性的。作用就類似於加了MULTI/EXEC。

好,原子性有保證了,那么我們再看看編寫語法。

> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
1) "key1"
2) "key2"
3) "first"
4) "second"

由前至后的命令解釋(Arg 表示參數的意思 argument):

​ eval: Redis執行Lua腳本的命令,后接腳本內容及各參數。這個命令是從2.6.0版本才開始支持的。

​ 1st. Arg : Lua腳本,其中的KEYS[]和ARGV[]是傳入script的參數 。

​ 2nd. Arg: 后面跟着的KEY個數n,從第三個參數開始的總共n個參數會被作為KEYS傳入script中,在script中可以通過KEYS[1], KEYS[2]…格式讀取,下標從1開始 。

​ Remain Arg: 剩余的參數可以在腳本中通過ARGV[1], ARGV[2]…格式讀取 ,下標從1開始 。

我們執行腳本內容是return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}表示返回傳入的參數,所以我們可以看到參數被原封不動的返回了。

接着,我們再來實戰一下,在Lua腳本中調用Redis方法吧。

我們可以在Lua腳本中通過以下兩個命令調用redis的命令程序

  • redis.call()
  • redis.pcall()

兩者的作用是一樣的,但是程序出錯時的返回結果略有不同。

img

使用方法,命令和在Redis中執行一模一樣:

> eval "return redis.call('set', KEYS[1], ARGV[1])" 1 foo bar
OK
> eval "return redis.call('get', KEYS[1])" 1 foo
"bar"

是不是很簡單,說了這么多,我們趕緊來現學現賣,寫一個腳本應用在我們的場景中吧。

> eval "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end" 1 msg_push_1 "1" 10

腳本的意思和我們之前在方案一中寫的程序邏輯一樣,先判斷緩存中是否存在鍵,如果不存在則存入鍵和其值,並且設置失效時間,最后返回0;如果存在則返回1。PS: 如果對if redis.call('get', KEYS[1]) == false這里為什么得到的結果要與false比較有疑問的話,可以看最后的Tip。

  • 執行第一次:我們發現返回值0,並且我們看到緩存中插入了一條數據,鍵為msg_push_1、值為"1"

  • 在失效前,執行多次:我們發現返回值一直為1。並且在執行第一次后的10秒,該鍵被自動刪除。

將以上邏輯遷入我們java代碼后,就是下面這個樣子啦

public boolean isMessagePushed(String messageId) {
    Assert.hasText(messageId, "消息ID不能為空");

    // 使用lua腳本檢測值是否存在
    String script = "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end";

    // 這里使用Long類型,查看源碼可知腳本返回值類型只支持Long, Boolean, List, or deserialized value type.
    DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
    redisScript.setScriptText(script);
    redisScript.setResultType(Long.class);

    // 設置key
    List<String> keyList = new ArrayList<>();
    // key為消息ID
    keyList.add(messageId);

    // 每個鍵的失效時間為20秒
    Long result = redisTemplate.execute(redisScript, keyList, 1, 20);

    // 返回true: 已讀、false: 未讀
    return result != null && result != 0L;
}

public void waitingForMsg() {
    // Message Received.
    if (!this.isMessagePushed(msgId)) {
        // 返回false表示未讀,接下來就可以執行推送操作啦
        this.pushMsgToFrontEnd();
    }
}

Tip

這里只是簡單的Redis中使用Lua腳本介紹,詳細的使用方法可以參考官方文檔,而且還有其他很多用法介紹。

對了,上面還有一個需要注意一下,就是關於Redis和Lua中變量的相互轉換,因為說起來啰哩啰嗦的,所以沒放在上文中,最后可以簡單說一下。

Redis to Lua conversion table.

  • Redis integer reply -> Lua number
  • Redis bulk reply -> Lua string
  • Redis multi bulk reply -> Lua table (may have other Redis data types nested)
  • Redis status reply -> Lua table with a single ok field containing the status
  • Redis error reply -> Lua table with a single err field containing the error
  • Redis Nil bulk reply and Nil multi bulk reply -> Lua false boolean type // 這里就是上面我們在腳本中做是否為空判斷的時候if redis.call('get', KEYS[1]) == false,采用與false比較的原因。Redis的nil(類似null)會被轉換為Lua的false

Lua to Redis conversion table.

  • Lua number -> Redis integer reply (the number is converted into an integer)
  • Lua string -> Redis bulk reply
  • Lua table (array) -> Redis multi bulk reply (truncated to the first nil inside the Lua array if any)
  • Lua table with a single ok field -> Redis status reply
  • Lua table with a single err field -> Redis error reply
  • Lua boolean false -> Redis Nil bulk reply.

注意點:

​ Lua的Number類型會被轉為Redis的Integer類型,因此如果希望得到小數時,需要由Lua返回String類型的數字。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM