設想這樣的一個場景,你要批量的執行一系列redis命令,例如執行100次get key,這時你要向redis請求100次+獲取響應100次。如果能一次性將100個請求提交給redis server,執行完成之后批量的獲取相應,只需要向redis請求1次,然后批量執行完命令,一次性結果,性能是不是會好很多呢?
答案是肯定的,節約的時間是客戶端client和服務器redis server之間往返網絡延遲的時間。這個時間可以用ping命令查看。
網絡延遲高:批量執行,性能提升明顯
網絡延遲低(本機):批量執行,性能提升不明顯
某些客戶端(java和python)提供了一種叫做pipeline的編程模式用來解決批量提交請求的方式。
這里我們用python客戶端來舉例說明一下。
1、pipeline
網絡延遲
client與server機器之間網絡延遲如下,大約是30ms。

測試用例
分別執行其中的try_pipeline和without_pipeline統計處理時間。
# -*- coding:utf-8 -*-
import redis
import time
from concurrent.futures import ProcessPoolExecutor
r = redis.Redis(host='10.93.84.53', port=6379, password='bigdata123')
def try_pipeline():
start = time.time()
with r.pipeline(transaction=False) as p:
p.sadd('seta', 1).sadd('seta', 2).srem('seta', 2).lpush('lista', 1).lrange('lista', 0, -1)
p.execute()
print time.time() - start
def without_pipeline():
start = time.time()
r.sadd('seta', 1)
r.sadd('seta', 2)
r.srem('seta', 2)
r.lpush('lista', 1)
r.lrange('lista', 0, -1)
print time.time() - start
def worker():
while True:
try_pipeline()
with ProcessPoolExecutor(max_workers=12) as pool:
for _ in range(10):
pool.submit(worker)
結果分析
try_pipeline平均處理時間:0.04659
without_pipeline平均處理時間:0.16672
我們的批量里有5個操作,在處理時間維度上性能提升了4倍!
網絡延遲大約是30ms,不使用批量的情況下,網絡上的時間損耗就有0.15s(30ms*5)以上。而pipeline批量操作只進行一次網絡往返,所以延遲只有0.03s。可以看到節省的時間基本都是網路延遲。
2、pipeline與transation
pipeline不僅僅用來批量的提交命令,還用來實現事務transation。
這里對redis事務的討論不會太多,只是給出一個demo。詳細的描述你可以參見這篇博客。redis事務
細心的你可能發現了,使用transaction與否不同之處在與創建pipeline實例的時候,transaction是否打開,默認是打開的。
# -*- coding:utf-8 -*-
import redis
from redis import WatchError
from concurrent.futures import ProcessPoolExecutor
r = redis.Redis(host='127.0.0.1', port=6379)
# 減庫存函數, 循環直到減庫存完成
# 庫存充足, 減庫存成功, 返回True
# 庫存不足, 減庫存失敗, 返回False
def decr_stock():
# python中redis事務是通過pipeline的封裝實現的
with r.pipeline() as pipe:
while True:
try:
# watch庫存鍵, multi后如果該key被其他客戶端改變, 事務操作會拋出WatchError異常
pipe.watch('stock:count')
count = int(pipe.get('stock:count'))
if count > 0: # 有庫存
# 事務開始
pipe.multi()
pipe.decr('stock:count')
# 把命令推送過去
# execute返回命令執行結果列表, 這里只有一個decr返回當前值
print pipe.execute()[0]
return True
else:
return False
except WatchError, ex:
# 打印WatchError異常, 觀察被watch鎖住的情況
print ex
pipe.unwatch()
def worker():
while True:
# 沒有庫存就退出
if not decr_stock():
break
# 實驗開始
# 設置庫存為100
r.set("stock:count", 100)
# 多進程模擬多個客戶端提交
with ProcessPoolExecutor(max_workers=2) as pool:
for _ in range(10):
pool.submit(worker)

