Redis 實戰 —— 05. Redis 其他命令簡介


發布與訂閱 P52

Redis 實現了發布與訂閱(publish/subscribe)模式,又稱 pub/sub 模式(與設計模式中的觀察者模式類似)。訂閱者負責訂閱頻道,發送者負責向頻道發送二進制字符串消息。每當有消息被發送至給定頻道時,頻道的所有訂閱者都會接收到消息。

發布與訂閱命令 P52
命令 格式 描述
SUBSCRIBE SUBSCRIBE channel [channel ...] 訂閱一個或多個頻道
UNSUBSCRIBE UNSUBSCRIBE [channel [channel ...]] 退訂一個或多個頻道;沒有指定頻道,則退訂全部頻道
PUBLISH PUBLISH channel message 給指定頻道發送消息,返回接收到消息的訂閱者數量
PSUBSCRIBE PSUBSCRIBE pattern [pattern ...] 訂閱一個或多個模式,與模式匹配的頻道均會訂閱
PUNSUBSCRIBE PUNSUBSCRIBE [pattern [pattern ...]] 退訂一個或多個模式;沒有指定模式,則退訂全部模式

相關演示代碼如下:

// 執行發布訂閱相關操作(注意:pubSubConn 中的 Conn 對象不能是 conn 對象,即必須建立兩個不同的連接)
func executePubSubOperation(pubSubConn redis.PubSubConn, conn redis.Conn) {
	// 監聽頻道消息並輸出
	go func() {
		for ; ; {
			switch result := pubSubConn.Receive().(type) {
			case redis.Message:
				// byte 轉 string
				resultMap := map[string]string  {
					"Channel": result.Channel,
					"Pattern": result.Pattern,
					"Data": string(result.Data),
				}
				handleResult(resultMap, nil)
			case redis.Subscription:
				handleResult(result, nil)
			}

		}
	}()

	// 訂閱兩個頻道(由於 Subscribe 內沒有執行 Receive,所以只有 error,沒有錯誤時就輸出 nil)
	// 訂閱者收到相應的消息訂閱信息,分別輸出 -> {subscribe channel_1 1} 和 {subscribe channel_2 2}
	handleResult(nil, pubSubConn.Subscribe("channel_1", "channel_2"))
	// 訂閱兩個模式,分別以 _1 和 g_2 為結尾的頻道 (由於 PSubscribe 內沒有執行 Receive,所以只有 error,沒有錯誤時就輸出 nil)
	// 訂閱者收到相應的消息訂閱信息,分別輸出 -> {psubscribe *_1 3} 和 {psubscribe *g_2 4}
	handleResult(nil, pubSubConn.PSubscribe("*_1", "*g_2"))

	time.Sleep(time.Second)

	// 發布消息到頻道 channel_1,輸出 -> 2,兩個訂閱者接收到消息
	// 訂閱者分別輸出 -> map[Channel:channel_1 Data:channel1 Pattern:] 和 map[Channel:channel_1 Data:channel1 Pattern:*_1]
	handleResult(conn.Do("PUBLISH", "channel_1", "channel1"))
	// 發布消息到頻道 channel_2,輸出 -> 1,一個訂閱者接收到消息
	// 訂閱者輸出 -> map[Channel:channel_2 Data:channel1 Pattern:]
	handleResult(conn.Do("PUBLISH", "channel_2", "channel1"))

	// 退訂兩個頻道(由於 Subscribe 內沒有執行 Receive,所以只有 error,沒有錯誤時就輸出 nil)
	// 訂閱者收到相應的消息退訂信息,分別輸出 -> {unsubscribe channel_1 3} 和 {unsubscribe channel_2 2}
	handleResult(nil, pubSubConn.Unsubscribe("channel_1", "channel_2"))
	// 退訂兩個頻道(由於 Subscribe 內沒有執行 Receive,所以只有 error,沒有錯誤時就輸出 nil)
	// 訂閱者收到相應的消息退訂信息,分別輸出 -> {punsubscribe *_1 1} 和 {punsubscribe *g_2 0}
	handleResult(nil, pubSubConn.PUnsubscribe("*_1", "*g_2"))

	time.Sleep(time.Second)
}
風險 P54
  • 穩定性:舊版 Redis 的客戶端讀取消息不夠快時,不斷積壓的消息就會使 Redis 的緩沖區越來越大,可能導致 Redis 的速度變慢,甚至直接崩潰,也有使 Redis 可能被操作系統強制殺死。新版 Redis 會自動斷開不符合 client-output-buffer-limit pubsub 配置選項要求的客戶端。
  • 可靠性:任何網絡系統在執行操作時都有可能會遇上斷線情況,而斷線產生的連接錯誤通常會使得網絡連接兩端中的其中一端進行重新連接。如果客戶端在執行訂閱操作的過程中斷線,那么客戶端將丟失在斷線期間發送的所有消息。

排序 P54

SORT 命令可以對列表、集合和有序集合進行排序 ,可以將 SORT 命令看作使 SQL 中的 order by 子句。 P55

命令 格式 描述
SORT SORT key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern ...]] [ASC|DESC] [ALPHA] [STORE destination] 根據給定的選項,返回或保存給定列表、集合、有序集合 key 中經過排序的元素

可實現功能: P55

  • 根據升序或降序排序元素(使用 [ASC|DESC],默認為升序)
  • 將元素看作數字或者字符串進行排序(使用 [ALPHA] 可以當作字符串排序,默認為數字)
  • 使用被排序元素之外的其他值作為權重來排序,甚至還可以從輸入的列表、集合、有序集合以外的其他地方進行取值(使用 [BY pattern] 可以根據指定值排序;可以使用不存在的鍵作為參數選項跳過排序沒直接返回結果)
  • 使用被排序元素之外的其他值作為返回結果(使用 [GET pattern [GET pattern ...]] 可以根據排序結果返回相應的值)
  • 保存排序結果(使用 [STORE destination] 可以指定將結果保存到指定 key,此時返回保存的元素的數量)
  • 限制返回結果(使用 [LIMIT offset count] 可以指定要跳過的元素數量和返回的元素數量)

相關演示代碼如下:

// 執行 SORT 命令
func executeSortOperation(conn redis.Conn) {
	// 刪除原有值
	handleResult(redis.Int(conn.Do("DEL", "id", "age", "name", "destination")))
	// 初始化
	handleResult(redis.Int(conn.Do("RPUSH", "id", 1, 4, 3, 2, 5)))
	handleResult(redis.String(conn.Do("SET", "age_1", 15)))
	handleResult(redis.String(conn.Do("SET", "age_2", 14)))
	handleResult(redis.String(conn.Do("SET", "age_3", 11)))
	handleResult(redis.String(conn.Do("SET", "age_4", 12)))
	handleResult(redis.String(conn.Do("SET", "age_5", 10)))
	handleResult(redis.String(conn.Do("SET", "name_1", "tom")))
	handleResult(redis.String(conn.Do("SET", "name_2", "jerry")))
	handleResult(redis.String(conn.Do("SET", "name_3", "bob")))
	handleResult(redis.String(conn.Do("SET", "name_4", "mary")))
	handleResult(redis.String(conn.Do("SET", "name_5", "jack")))

	// 根據 id 降序排序,跳過第一個元素,獲取接下來的兩個元素,輸出 -> [4 3]
	handleResult(redis.Ints(conn.Do("SORT", "id", "LIMIT", "1", "2", "DESC")))
	// 根據 age_{id} 升序排序,按照 id age_{id} name_{id} 順序返回結果,輸出 -> [5 10 jack 3 11 bob 4 12 mary 2 14 jerry 1 15 tom]
	handleResult(redis.Strings(conn.Do("SORT", "id", "BY", "age_*", "GET", "#", "GET", "age_*", "GET", "name_*", "ALPHA")))
	// 根據 name_{id} 字典序降序排序,按照 id age_{id} name_{id} 順序返回結果,存儲到 destination 中
	// 輸出 -> 15
	handleResult(redis.Int(conn.Do("SORT", "id", "BY", "name_*", "GET", "#", "GET", "age_*", "GET", "name_*", "ALPHA", "DESC", "STORE", "destination")))
	// 輸出 列表 結果,輸出 -> [1 15 tom 4 12 mary 2 14 jerry 5 10 jack 3 11 bob]
	handleResult(redis.Strings(conn.Do("LRANGE", "destination", 0, -1)))
}

基本的 Redis 事務

Redis 有 5 個命令可以讓用戶在不被打斷的情況下對多個鍵執行操作,它們分別是: WATCHMULTIEXECUNWATCHDISCART 。基本的 Redis 事務只用 MULTIEXEC 即可,使用多個命令的事務將在以后進行介紹。 P56

Redis 的基本事務可以讓一個客戶端在不被其他客戶端打斷的情況下執行多個命令。當一個事務執行完畢之后, Redis 才會處理其他客戶端的命令。 P56

假如某個(或某些) key 正處於 WATCH 命令的監視之下,且事務塊中有和這個(或這些) key 相關的命令,那么 EXEC 命令只在這個(或這些) key 沒有被其他命令所改動的情況下執行並生效,否則該事務被打斷(abort)。

命令 格式 描述
MULTI MULTI 標記一個事務塊的開始,總是返回 OK
EXEC EXEC 執行所有事務塊內的命令,按順序返回命令的執行結果。當操作被打斷時,返回 nil

相關演示代碼如下:

// 執行事務命令
func executeTransactionOperation(conn redis.Conn) {
	// 刪除原有值
	handleResult(redis.Int(conn.Do("DEL", "counter")))
	// 開啟事務(采用流水線方式,降低通信開銷)
	handleResult(nil, conn.Send("MULTI"))
	// 事務中執行自增操作(采用流水線方式,降低通信開銷)
	handleResult(nil, conn.Send("INCR", "counter"))
	handleResult(nil, conn.Send("INCR", "counter"))
	handleResult(nil, conn.Send("INCR", "counter"))
	// 執行命令,依次執行自增操作,分別返回操作結果,輸出 -> [1 2 3]
	handleResult(redis.Ints(conn.Do("EXEC")))
}
練習題:移除競爭條件 P58

簡單實踐 - 文章投票VoteArticle 函數內曾說明沒有事務控制,會存在並發問題。該函數包含一個競爭條件以及一個因為競爭條件而出現的 bug 。函數的競爭條件可能會造成內存泄漏,而函數的 bug 則可能會導致不正確的投票結果出現。你能想辦法修復它們嗎?

提示:如果你覺得很難理解競爭條件為什么會導致內存泄漏,那么可以在分析 簡單實踐 - 文章投票 中的 PostArticle 的函數的同時,閱讀一下 6.2.5 節。

  • 感覺還是無法理解為什么會有這種情況,強行猜測以下可能性(雖然都不是競爭條件造成的):

    • PostArticle 函數中,在將作者加入到投票用戶集合中后,給其設定過期時間。如果設定過期時間之前由於某些原有異常導致沒有進行相關操作,那么這個集合將一直在內存中,不會過期,從而造成內存泄漏。
    • VoteArticle 函數中,如果將投票用戶添加到投票用戶集合中后,還沒來得及給文章的相關信息進行設置,那么這個用戶以后不能再投票,並且文章的投票信息不對。
  • 不是太明白究竟在競爭什么,只能針對以上問題處理。用事務只能再添加一個集合在事務中標記事務是否執行成功,處理流程大致如下:

    1. 先將用戶與文章作為值加入到這個集合
    2. 再將用戶加入到投票集合中
    3. 然后開啟事務,依次發送更新信息的命令和刪除那個集合中的相關信息,並執行
    4. 最后有一個 worker 掃描這個集合,將其中值拿出來解析出用戶和文章,再查改用戶是否已在集合中,如果在集合中,則重新執行 步驟3,最后刪除該值
練習題:提高性能 P58

簡單實踐 - 文章投票ListArticles 函數在獲取整個頁面的文章時,需要在 Redis 與客戶端之間最多會進行 26 次通信往返,這種做法十分低效,你能否想個辦法將 ListArticles 函數的往返次數降低為 2 次呢?

提示:使用流水線

  • 獲取文章列表時,先獲取相應的 id 列表(最多 25 個),再循環獲取每個 id 對應的文章,所以最多會進行 26 次通信往返
  • 由於必須先獲取 id 列表,再獲取每個 id 對應的文章,所以只能將這兩塊分開,所以最低至少有 2 次通信往返。大致流程如下:
    1. 先獲取 id 列表
    2. 使用流水線,依次將獲取每個 id 的文章的命令發送至緩沖區,最后與服務端通信並執行命令(Go 中可以使用上述事務演示代碼的方式進行操作 )
    3. 最后按照順序解析結果

過期時間 P58

只有少數幾個命令可以原子地為鍵設置過期時間,並且對於列表、集合、哈希表和有序集合這樣的容器來說,鍵過期命令只能為整個鍵設置過期時間,而沒辦法為鍵里面的單個元素設置過期時間(可以使用存儲時間戳的有序集合來實現針對單個元素的過期時間;也可以以前綴的形式將容器中的單個元素變為字符串)。 P58

用於處理過期時間的 Redis 命令 P59
命令 格式 描述
PERSIST PERSIST key 移除鍵的過期時間
TTL TTL key 查看鍵距離過期時間還有多少秒
EXPIRE EXPIRE key seconds 讓鍵在指定的秒數之后過期
EXPIREAT EXPIREAT key timestamp 讓鍵在指定的 UNIX 秒級時間戳過期
PTTL PTTL key 查看鍵距離過期時間還有多少毫秒
PEXPIRE PEXPIRE key milliseconds 讓鍵在指定的毫秒數之后過期
PEXPIREAT PEXPIREAT key milliseconds-timestamp 讓鍵在指定的 UNIX 毫秒級時間戳過期

相關演示代碼如下:

// 指定過期時間相關的命令
func executeExpirationOperation(conn redis.Conn) {
	// 刪除原有值
	handleResult(redis.Int(conn.Do("DEL", "string")))
	// 設置字符串的值為 value,輸出 -> OK,string 變為 -> value
	handleResult(redis.String(conn.Do("SET", "string", "value")))
	// 查看 string 的過期時間,輸出 -> -1,表示不過期
	handleResult(redis.Int(conn.Do("TTL", "string")))
	// 設置 string 在 3 秒后過期,輸出 -> 1
	handleResult(redis.Int(conn.Do("EXPIRE", "string", 3)))
	time.Sleep(time.Second)
	// 查看 string 的過期時間,輸出 -> 2
	handleResult(redis.Int(conn.Do("TTL", "string")))
	// 移除 string 的過期時間,輸出 -> 1
	handleResult(redis.Int(conn.Do("PERSIST", "string")))
	// 查看 string 的過期時間,輸出 -> -1,表示不過期
	handleResult(redis.Int(conn.Do("TTL", "string")))

	// 設置 string 在當前時間 2500 毫秒后過期,輸出 -> 1
	handleResult(redis.Int(conn.Do("PEXPIREAT", "string", time.Now().UnixNano() / 1e6 + 2500)))
	time.Sleep(time.Second)
	// 查看 string 的過期時間,輸出 -> 1499,表示還有 1499 毫秒過期
	handleResult(redis.Int(conn.Do("PTTL", "string")))
	time.Sleep(2 * time.Second)
	// 查看 string 的過期時間,輸出 -> -2,表示已過期
	handleResult(redis.Int(conn.Do("PTTL", "string")))
}
練習題:使用 EXPIRE 命令代替時間戳有序集合 P59

簡單實踐 - Web應用中使用了一個根據時間戳排序、用於清除會話信息的有序集合,通過這個有序集合,程序可以在清理會話的時候,對用戶瀏覽過的商品以及用戶購物車里面的商品進行分析。但是,如果我們決定不對商品進行分析的話,那么就可以使用 Redis 提供的過期時間操作來自動清理過期的會話信息,而無須使用清理函數。那么,你能否修改簡單實踐 - Web應用中定義的 UpdateToken 函數和 UpdateCartItem 函數,讓它們使用過期時間操作來刪除會話信息,從而代替目前使用有序集合來記錄並清除會話信息的做法呢?

  • UpdateToken 函數:令牌於 userId 的對應關系不在存儲於哈希表中,而是以前綴的形式將容器中的單個元素變為字符串(上面提到過),並設置過期時間,並移除最近操作時間有序集合,這樣令牌到期后就會自動刪除,不需要清理函數了。
  • UpdateCartItem 函數:由於當時此處把 Redis 當作數據庫使用,認為購物車不應該隨登錄態的失效而消失,所以購物車與 userId 掛鈎,不存在上述問題。但是如果要讓購物車也自動過期,就需要在 UpdateToken 函數內同時設置購物車的過期時間即可。

本文首發於公眾號:滿賦諸機(點擊查看原文) 開源在 GitHub :reading-notes/redis-in-action


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM