背景
由於Redis的單線程服務模式,命令keys *會阻塞正常的業務請求,不建議使用keys * pattern的方法進行查詢,可能會使服務器卡頓而出現事故。如何獲取指定的 key?
可以采用Redis提供的SCAN命令。SCAN 命令是一個基於游標的迭代器(cursor based iterator):SCAN 命令每次被調用之后都會向用戶返回一個新的游標, 用戶在下次迭代時會使用這個新游標作為 SCAN 命令的游標參數, 以此來延續之前的迭代過程。當 SCAN 命令的游標參數被設置為 0 時, 服務器將開始一次新的迭代;而當服務器向用戶返回值為 0 的游標時, 表示迭代已結束。
SCAN的語法如下:
SCAN cursor [MATCH pattern] [COUNT count]
其中 cousor 是游標,MATCH 則支持正則匹配,我們正好可以利用此功能,比如匹配 前綴為"dba_"的key, COUNT 是每次獲取多少個key。
redis 127.0.0.1:6379> scan 0 1) "17" 2) 1) "key:12" 2) "key:8" 3) "key:4" 4) "key:14" 5) "key:16" 6) "key:17" 7) "key:15" 8) "key:10" 9) "key:3" 10) "key:7" 11) "key:1" redis 127.0.0.1:6379> scan 17 1) "0" 2) 1) "key:5" 2) "key:18" 3) "key:0" 4) "key:2" 5) "key:19" 6) "key:13" 7) "key:6" 8) "key:9" 9) "key:11"
在上面這個例子中, 第一次迭代使用 0 作為游標, 表示開始一次新的迭代。第二次迭代使用的是第一次迭代時返回的游標, 也即是命令回復第一個元素的值 —— 17 。 在第二次調用 SCAN 命令時, 命令返回了游標 0 , 這表示迭代已經結束, 整個數據集(collection)已經被完整遍歷過了。
從上面的示例可以看到, SCAN 命令的回復是一個包含兩個元素的數組, 第一個數組元素是用於進行下一次迭代的新游標, 而第二個數組元素則是一個數組, 這個數組中包含了所有被迭代的元素。
例如: SCAN 0 MATCH aaa* COUNT 5 #表示從游標0開始查詢aaa開頭的key,每次返回5條,但是這個5條不一定,只是給Redis了參考值,具體返回數量看Redis。
注意:以 0 作為游標開始一次新的迭代, 一直調用 SCAN 命令, 直到命令返回游標 0 , 我們稱這個過程為一次完整遍歷(full iteration)。 我們會在后面的代碼實現中利用此特點。
Python的redis 模塊提供 scan_iter 迭代器來遍歷key,其返回的結果迭代器對象。
In [53]: ret=r.scan_iter('dba_*',20) In [54]: print ret <generator object scan_iter at 0x102ff45a0>
至此,我們解決了如何獲取數據的問題,下面思考第二個問題。
如何執行刪除
這個相對比較簡單,Redis 提供DEL 命令
127.0.0.1:6379[2]> get "dba_7" "r06cVX9" 127.0.0.1:6379[2]> get "dba_1" "ETX57PA" 127.0.0.1:6379[2]> del "dba_7" "dba_1" (integer) 2 127.0.0.1:6379[2]>
在redis-py 中,提供了delete(key),delete(*key)的函數, 其中參數 *key 是多個值的列表。 到這里,我們大致可以想到獲取key,然后批量刪除
(mytest)➜ test git:(master) ✗ python delete_key.py initial keys successfully,use time: 90.2497739792 normal ways end at: 68.685477972 normal ways delete numbers: 1000000
常規方式的刪除10W個key耗時68.7秒,如果是1.2億個key 要多少時間呢?68*1000/3600=18.8小時。能不能更快呢?
如何提高執行速度
Redis本身是基於Request/Response協議的,客戶端發送一個命令,等待Redis應答,Redis在接收到命令,處理后應答。其中發送命令加上返回結果的時間稱為(Round Time Trip)RRT-往返時間。如果客戶端發送大量的命令給Redis,那就是等待上一條命令應答后再執行再執行下一條命令,這中間不僅僅多了RTT,而且還頻繁的調用系統IO,發送網絡請求。
Pipeline(流水線)功能極大的改善了上面的缺點。Pipeline能將一組Redis命令進行組裝,然后一次性傳輸給Redis,再將Redis執行這組命令的結果按照順序返回給客戶端。
需要注意的是Pipeline 雖然好用,但是Pipline組裝的命令個數不能沒有限制,否則一次組裝數據量過大,一方面增加客戶端的等待時間,另一方面會造成網絡阻塞,需要批量組裝。使用Pepline 和常規方式的性能對比如下:
代碼
import redis import random import string import time pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=2) r = redis.Redis(connection_pool=pool) def random_str(): return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(7)) def init_keys(): start_time = time.time() for i in xrange(0, 20): key_name = 'dba_'+str(i) value_name = random_str() r.set(key_name, value_name) print 'initial keys successfully,use time:', time.time() - start_time def del_keys_without_pipe(): start_time = time.time() result_length = 0 for key in r.scan_iter(match='dba_*', count=2000): r.delete(key) result_length += 1 print "normal ways end at:", time.time() - start_time print "normal ways delete numbers:", result_length def del_keys_with_pipe(): start_time = time.time() result_length = 0 pipe = r.pipeline() for key in r.scan_iter(match='dba_*', count=5000): pipe.delete(key) result_length += 1 if result_length % 5000 == 0: pipe.execute() pip_time = time.time() print "use pipeline scan time ", time.time() - start_time pipe.execute() print "use pipeline end at:", time.time() - pip_time print "use pipeline ways delete numbers:", result_length def main(): init_keys() del_keys_without_pipe() init_keys() del_keys_with_pipe() if __name__ == '__main__': main()