Python 下redis 批量操作操作


方法一:使用 pipeline

  使用pipelining 發送命令時,redis server必須部分請求放到隊列中(使用內存)執行完畢后一次性發送結果,在 pipeline 使用期間,將“獨占”鏈接,無法進行非“管道”類型的其他操作,直至 pipeline 關閉;如果 pipeline 的指令集很多很龐大,為了不影響其他操作(redis 最大時間lua-time-limit默認是5s),可以使用其他新建新鏈接操作。批量操作如下:

import redis

r = redis.Redis(host='127.0.0.1', port=6379, password='1234567890')
with r.pipeline() as ctx:
    a = time.time()
    ctx.hset('current', "time2", a)
    ctx.hset('current', "time3", a)
    res = ctx.execute()
    print("result: ", res)

使用 pipe line 以樂觀鎖的形式執行事務操作,詳情:redis事務

# -*- coding:utf-8 -*-

import redis
from redis import WatchError
from concurrent.futures import ProcessPoolExecutor

r = redis.Redis(host='127.0.0.1', port=6379)

# 減庫存函數, 循環直到減庫存完成
# 庫存充足, 減庫存成功, 返回True
# 庫存不足, 減庫存失敗, 返回False
def decr_stock():
    # python中redis事務是通過pipeline的封裝實現的
    with r.pipeline() as pipe:
        while True:
            try:
                # watch庫存鍵, multi后如果該key被其他客戶端改變, 事務操作會拋出WatchError異常
                pipe.watch('stock:count')
                count = int(pipe.get('stock:count'))
                if count > 0:  # 有庫存
                    # 事務開始
                    pipe.multi()  # multi 判斷 watch 監控的 key 是否被其他客戶端改變
                    pipe.decr('stock:count')
                    # 把命令推送過去
                    # execute返回命令執行結果列表, 這里只有一個decr返回當前值
                    result = pipe.execute()[0]
                    print("result: ", result)
                    return True
                else:
                    return False
            except WatchError as e:
                # 打印WatchError異常, 觀察被watch鎖住的情況
                print(e.args)
            finally:
                pipe.unwatch()


def worker():
    while True:
        # 沒有庫存就退出
        if not decr_stock():
            break


# 實驗開始
# 設置庫存為100
r.set("stock:count", 100)

# 多進程模擬多個客戶端提交
with ProcessPoolExecutor(max_workers=2) as pool:
    for _ in range(10):
        pool.submit(worker)

方法二:使用 register_script 

分布執行,發送腳本到redis服務器,獲取一個本次連接的一個調用句柄,根據此句柄可以無數次執行不同參數調用

    import redis
    import time

    r = redis.Redis(host='127.0.0.1', port=31320, password='12345678')
    
    lua = """
    local key = KEYS[1]
    local field = ARGV[1]
    local timestamp_new = ARGV[2]
    
    -- get timestamp of the key in redis
    local timestamp_old = redis.call('hget', key, field)
    -- if timestamp_old == nil, it means the key is not exist
    if timestamp_old == nil or timestamp_old == false or timestamp_new > timestamp_old then
        redis.call('hset', key, field .. 1, timestamp_new)
        -- timestamp_new > timestamp_old
        return redis.pcall('hset', key, field, timestamp_new)
    end
    
    """

    cmd = r.register_script(lua)

    cur_time = time.time()
    cmd(keys=['current'], args=["time", cur_time])

register_script 調用 lua 來實現,需要注意 redis.call(method, key, field) 的返回值(nil,false,1),此處沒有鍵值返回的是false。如果中間有錯誤,所有的語句不時不生效。

方法三:使用 script_load 和 evalsha

簡而言之,通過 script_load 發送給redis服務器,使加載 lua 腳本,並常駐內存,返回標志,通過 evalsha 按標志進行執行,此連接脫離本次redis 客戶端。

    import redis
    import time

    r = redis.Redis(host='127.0.0.1', port=31320, password='12345678')
    
    lua = """
    local key = KEYS[1]
    local field = ARGV[1]
    local timestamp_new = ARGV[2]
    
    -- get timestamp of the key in redis
    local timestamp_old = redis.call('hget', key, field)
    -- if timestamp_old == nil, it means the key is not exist
    if timestamp_old == nil or timestamp_old == false or timestamp_new > timestamp_old then
        redis.call('hset', key, field .. 1, timestamp_new)
        -- timestamp_new > timestamp_old
        return redis.pcall('hset', key, field, timestamp_new)
    end
    
    """
    sha = r.script_load(lua)
    print(r.evalsha(sha, 1, 'current', 'time', time.time()))

Redis 管理Lua腳本:(Python下為 script_... )

  • script load
    此命令用於將Lua腳本加載到Redis內存中
  • script exists
    scripts exists sha1 [sha1 …]  
    此命令用於判斷sha1是否已經加載到Redis內存中
  • script flush 
    此命令用於清除Redis內存已經加載的所有Lua腳本,在執行script flush后,所有 sha 不復存在。
  • script kill 
    此命令用於殺掉正在執行的Lua腳本。

方法四:eval

使用方法與方法三類似,但是eval是一次性請求,每次請求,必須攜帶 lua 腳本

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM