Python Redis pipeline操作(秒殺實現)


設想這樣的一個場景,你要批量的執行一系列redis命令,例如執行100次get key,這時你要向redis請求100次+獲取響應100次。如果能一次性將100個請求提交給redis server,執行完成之后批量的獲取相應,只需要向redis請求1次,然后批量執行完命令,一次性結果,性能是不是會好很多呢?

答案是肯定的,節約的時間是客戶端client和服務器redis server之間往返網絡延遲的時間。這個時間可以用ping命令查看。

網絡延遲高:批量執行,性能提升明顯

網絡延遲低(本機):批量執行,性能提升不明顯

某些客戶端(java和python)提供了一種叫做pipeline的編程模式用來解決批量提交請求的方式。

這里我們用python客戶端來舉例說明一下。

 

1、pipeline

網絡延遲

client與server機器之間網絡延遲如下,大約是30ms。

 

測試用例

分別執行其中的try_pipeline和without_pipeline統計處理時間。 

復制代碼
# -*- coding:utf-8 -*-

import redis
import time
from concurrent.futures import ProcessPoolExecutor

r = redis.Redis(host='10.93.84.53', port=6379, password='bigdata123')


def try_pipeline():
    start = time.time()
    with r.pipeline(transaction=False) as p:
        p.sadd('seta', 1).sadd('seta', 2).srem('seta', 2).lpush('lista', 1).lrange('lista', 0, -1)
        p.execute()
    print time.time() - start


def without_pipeline():
    start = time.time()
    r.sadd('seta', 1)
    r.sadd('seta', 2)
    r.srem('seta', 2)
    r.lpush('lista', 1)
    r.lrange('lista', 0, -1)
    print time.time() - start


def worker():
    while True:
        try_pipeline()

with ProcessPoolExecutor(max_workers=12) as pool:
    for _ in range(10):
        pool.submit(worker)
復制代碼

結果分析

try_pipeline平均處理時間:0.04659

without_pipeline平均處理時間:0.16672

我們的批量里有5個操作,在處理時間維度上性能提升了4倍!

網絡延遲大約是30ms,不使用批量的情況下,網絡上的時間損耗就有0.15s(30ms*5)以上。而pipeline批量操作只進行一次網絡往返,所以延遲只有0.03s。可以看到節省的時間基本都是網路延遲。

 

2、pipeline與transation

pipeline不僅僅用來批量的提交命令,還用來實現事務transation。

這里對redis事務的討論不會太多,只是給出一個demo。詳細的描述你可以參見這篇博客。redis事務

細心的你可能發現了,使用transaction與否不同之處在與創建pipeline實例的時候,transaction是否打開,默認是打開的。

復制代碼
# -*- coding:utf-8 -*-

import redis
from redis import WatchError
from concurrent.futures import ProcessPoolExecutor

r = redis.Redis(host='127.0.0.1', port=6379)


# 減庫存函數, 循環直到減庫存完成
# 庫存充足, 減庫存成功, 返回True
# 庫存不足, 減庫存失敗, 返回False
def decr_stock():

    # python中redis事務是通過pipeline的封裝實現的
    with r.pipeline() as pipe:
        while True:
            try:
                # watch庫存鍵, multi后如果該key被其他客戶端改變, 事務操作會拋出WatchError異常
                pipe.watch('stock:count')
                count = int(pipe.get('stock:count'))
                if count > 0:  # 有庫存
                    # 事務開始
                    pipe.multi()
                    pipe.decr('stock:count')
                    # 把命令推送過去
                    # execute返回命令執行結果列表, 這里只有一個decr返回當前值
                    print pipe.execute()[0]
                    return True
                else:
                    return False
            except WatchError, ex:
                # 打印WatchError異常, 觀察被watch鎖住的情況
                print ex
                pipe.unwatch()


def worker():
    while True:
        # 沒有庫存就退出
        if not decr_stock():
            break


# 實驗開始
# 設置庫存為100
r.set("stock:count", 100)

# 多進程模擬多個客戶端提交
with ProcessPoolExecutor(max_workers=2) as pool:
    for _ in range(10):
        pool.submit(worker)
復制代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM