Python 多線程之 Redis 分布式鎖


前言


在很多互聯網產品應用中,有些場景需要加鎖處理,例如:雙11秒殺,全局遞增ID,樓層生成等等。

大部分的解決方案是基於 DB 實現的,Redis 為單進程單線程模式,采用隊列模式將並發訪問變成串行訪問,且多客戶端對 Redis 的連接並不存在競爭關系。

其次 Redis 提供一些命令SETNX,GETSET,可以方便實現分布式鎖機制。

 

Python代碼實現

import time
import redis
import threading
 
#使用連接池方式連接redis
redis_pool=redis.ConnectionPool(host="127.0.0.1",port=6379)
redis_conn=redis.Redis(connection_pool=redis_pool)
 
#定義redis類
class RedisLock():
    def __init__(self):
        self.redis_conn = redis_conn
        print("init the redis connection")
 
    #獲取鎖
    def get_lock(self,name,value):
        while True:
            # set(name, value, ex=None, px=None, nx=False, xx=False)
            # nx - 如果設置為True,則只有name不存在時,當前set操作才執行
            # ex - 過期時間(秒)
            result=self.redis_conn.set(name,value,nx=True,ex=3)
            # print(result)
            if(result):
                # 獲取到result后就終止while循環
                break
            # time.sleep(0.5)
 
    #釋放鎖
    def release_lock(self,name,value):
        #獲取原name key對應的value
        old_value = redis_conn.get(name)
        print("--------------------------------the key =%s ;the name=%s"%(name,old_value))
        #判斷原value 與 要釋放的值是否相同
        if(old_value == value):
            #相同就從redis里面釋放
            self.redis_conn.delete(name)
            print("release the lock is success")
 
def redis_lock_test(lock,name,value):
    try:
        print("% --start to work"%name)
        print("% --ready get the lock and execute lock operation"%name)
        lock.get_lock(name,value)#這里是獲取鎖操作
        print("% --get the lock and continue to operation"%name)
    except Exception as e:
        print("the exception is:%s"%str(e))
    finally:
        print("% --ready release the lock"%name)
        lock.release_lock(name,value)#最終必須釋放鎖操作
        print("% --release the lock is over"%name)
 
if __name__ == '__main__':
    start_time=time.time()
    rs=RedisLock()
    tasks=[]
    for i in range(1,3):
        # 創建線程
        t = threading.Thread(target=redis_lock_test(rs,"task-name%"%i,"lock%d"%i))
        # 將創建的線程放入列表
        tasks.append(t)
        # 啟動線程
        t.start()
    #這里沒有設置守護線程且沒有設置join函數的timeout參數時,主線程將會一直等待,直到子線程全部結束,主線程才結束,程序退出
    [t.join() for t in tasks]
    print("total waster time is:",time.time()-start_time)

 

注意:
1. thread.setDaemon(True) 當設置守護線程 join 函數的參數 timeout=2 時,主線程將會等待多個子線程 timeout 的累加和這樣的一段時間,時間一到,主線程結束,殺死未執行完的子線程,程序退出。

 

2. 當沒有設置守護線程且 join 函數的參數 timeout=2 時,主線程將會等待多個子線程 timeout 的累加和這樣的一段時間,時間一到主線程結束,但是並沒有殺死子線程,子線程依然可以繼續執行,直到子線程全部結束,程序退出。

 

歡迎關注【無量測試之道】公眾號,回復【領取資源】

Python+Unittest框架API自動化、

Python+Unittest框架API自動化、

Python+Pytest框架API自動化、

Python+Pandas+Pyecharts大數據分析、

Python+Selenium框架Web的UI自動化、

Python+Appium框架APP的UI自動化、

Python編程學習資源干貨、

資源和代碼 免費送啦~
文章下方有公眾號二維碼,可直接微信掃一掃關注即可。

備注:我的個人公眾號已正式開通,致力於IT互聯網技術的分享。

包含:數據分析、大數據測試、機器學習、測試開發、API接口自動化、測試運維、UI自動化、性能測試、代碼檢測、編程技術等。

微信搜索公眾號:“無量測試之道”,或掃描下方二維碼:

添加關注,讓我們一起共同成長!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM