redis事務
redis事務介紹:
1. redis事務可以一次執行多個命令,本質是一組命令的集合。
2.一個事務中的所有命令都會序列化,按順序串行化的執行而不會被其他命令插入
作用:一個隊列中,一次性、順序性、排他性的執行一系列命令
multi指令的使用
1. 下面指令演示了一個完整的事物過程,所有指令在exec前不執行,而是緩存在服務器的一個事物隊列中
2. 服務器一旦收到exec指令才開始執行事物隊列,執行完畢后一次性返回所有結果
3. 因為redis是單線程的,所以不必擔心自己在執行隊列是被打斷,可以保證這樣的“原子性”
注:redis事物在遇到指令失敗后,后面的指令會繼續執行
# Multi 命令用於標記一個事務塊的開始事務塊內的多條命令會按照先后順序被放進一個隊列當中,最后由 EXEC 命令原子性( atomic )地執行 > multi(開始一個redis事物) incr books incr books > exec (執行事物) > discard (丟棄事物)

[root@redis ~]# redis-cli 127.0.0.1:6379> multi OK 127.0.0.1:6379> set test 123 QUEUED 127.0.0.1:6379> exec 1) OK 127.0.0.1:6379> get test "123" 127.0.0.1:6379> multi OK 127.0.0.1:6379> set test 456 QUEUED 127.0.0.1:6379> discard OK 127.0.0.1:6379> get test "123" 127.0.0.1:6379>

def multi_test(): r = redis.Redis(host='127.0.0.1') pipe = r.pipeline() pipe.multi() #開啟事務 pipe.set('key2', 400) #存儲子命令 pipe.execute() #執行事務 print("第一次事務提交后的結果"+r.get('key2').decode("utf-8")) pipe.multi() # 開啟事務 pipe.set('key2', 100) # 存儲子命令 print("第二次未提交事務的結果"+r.get("key2").decode("utf-8")) #第一次事務提交后的結果400 #第二次未提交事務的結果400
注:mysql的rollback與redis的discard的區別
1. mysql回滾為sql全部成功才執行,一條sql失敗則全部失敗,執行rollback后所有語句造成的影響消失
2. redis的discard只是結束本次事務,正確命令造成的影響仍然還在.
1)redis如果在一個事務中的命令出現錯誤,那么所有的命令都不會執行;
2)redis如果在一個事務中出現運行錯誤,那么正確的命令會被執行。
watch 指令作用
實質:WATCH 只會在數據被其他客戶端搶先修改了的情況下通知執行命令的這個客戶端(通過 WatchError 異常)但不會阻止其他客戶端對數據的修改
1. watch其實就是redis提供的一種樂觀鎖,可以解決並發修改問題
2. watch會在事物開始前盯住一個或多個關鍵變量,當服務器收到exec指令要順序執行緩存中的事物隊列時,redis會檢查關鍵變量自watch后是否被修改
3. WATCH 只會在數據被其他客戶端搶先修改了的情況下通知執行命令的這個客戶端(通過 WatchError 異常)但不會阻止其他客戶端對數據的修改
watch+multi實現樂觀鎖
setnx指令(redis的分布式鎖)
1、分布式鎖
1. 分布式鎖本質是占一個坑,當別的進程也要來占坑時發現已經被占,就會放棄或者稍后重試
2. 占坑一般使用 setnx(set if not exists)指令,只允許一個客戶端占坑
3. 先來先占,用完了在調用del指令釋放坑
> setnx lock:codehole true .... do something critical .... > del lock:codehole
4. 但是這樣有一個問題,如果邏輯執行到中間出現異常,可能導致del指令沒有被調用,這樣就會陷入死鎖,鎖永遠無法釋放
5. 為了解決死鎖問題,我們拿到鎖時可以加上一個expire過期時間,這樣即使出現異常,當到達過期時間也會自動釋放鎖
> setnx lock:codehole true > expire lock:codehole 5 .... do something critical .... > del lock:codehole
6. 這樣又有一個問題,setnx和expire是兩條指令而不是原子指令,如果兩條指令之間進程掛掉依然會出現死鎖
7. 為了治理上面亂象,在redis 2.8中加入了set指令的擴展參數,使setnx和expire指令可以一起執行
> set lock:codehole true ex 5 nx ''' do something ''' > del lock:codehole
redis解決超賣問題
1、使用reids的 watch + multi 指令實現

#! /usr/bin/env python # -*- coding: utf-8 -*- import redis def sale(rs): while True: with rs.pipeline() as p: try: p.watch('apple') # 監聽key值為apple的數據數量改變 count = int(rs.get('apple')) print('拿取到了蘋果的數量: %d' % count) p.multi() # 事務開始 if count> 0 : # 如果此時還有庫存 p.set('apple', count - 1) p.execute() # 執行事務 p.unwatch() break # 當庫存成功減一或沒有庫存時跳出執行循環 except Exception as e: # 當出現watch監聽值出現修改時,WatchError異常拋出 print('[Error]: %s' % e) continue # 繼續嘗試執行 rs = redis.Redis(host='127.0.0.1', port=6379) # 連接redis rs.set('apple',1000) # # 首先在redis中設置某商品apple 對應數量value值為1000 sale(rs)
1)原理
1. 當用戶購買時,通過 WATCH 監聽用戶庫存,如果庫存在watch監聽后發生改變,就會捕獲異常而放棄對庫存減一操作
2. 如果庫存沒有監聽到變化並且數量大於1,則庫存數量減一,並執行任務
2)弊端
1. Redis 在嘗試完成一個事務的時候,可能會因為事務的失敗而重復嘗試重新執行
2. 保證商品的庫存量正確是一件很重要的事情,但是單純的使用 WATCH 這樣的機制對服務器壓力過大
2、使用reids的 watch + multi + setnx 指令實現
1)為什么要自己構建鎖
1. 雖然有類似的 SETNX 命令可以實現 Redis 中的鎖的功能,但他鎖提供的機制並不完整
2. 並且setnx也不具備分布式鎖的一些高級特性,還是得通過我們手動構建
2)創建一個redis鎖
1. 在 Redis 中,可以通過使用 SETNX 命令來構建鎖:rs.setnx(lock_name, uuid值)
2. 而鎖要做的事情就是將一個隨機生成的 128 位 UUID 設置位鍵的值,防止該鎖被其他進程獲取
3)釋放鎖
1. 鎖的刪除操作很簡單,只需要將對應鎖的 key 值獲取到的 uuid 結果進行判斷驗證
2. 符合條件(判斷uuid值)通過 delete 在 redis 中刪除即可,rs.delete(lockname)
3. 此外當其他用戶持有同名鎖時,由於 uuid 的不同,經過驗證后不會錯誤釋放掉別人的鎖
4)解決鎖無法釋放問題
1. 在之前的鎖中,還出現這樣的問題,比如某個進程持有鎖之后突然程序崩潰,那么會導致鎖無法釋放
2. 而其他進程無法持有鎖繼續工作,為了解決這樣的問題,可以在獲取鎖的時候加上鎖的超時功能

import redis import uuid import time # 1.初始化連接函數 def get_conn(host,port=6379): rs = redis.Redis(host=host, port=port) return rs # 2. 構建redis鎖 def acquire_lock(rs, lock_name, expire_time=10): ''' rs: 連接對象 lock_name: 鎖標識 acquire_time: 過期超時時間 return -> False 獲鎖失敗 or True 獲鎖成功 ''' identifier = str(uuid.uuid4()) end = time.time() + expire_time while time.time() < end: # 當獲取鎖的行為超過有效時間,則退出循環,本次取鎖失敗,返回False if rs.setnx(lock_name, identifier): # 嘗試取得鎖 return identifier # time.sleep(.001) return False # 3. 釋放鎖 def release_lock(rs, lockname, identifier): ''' rs: 連接對象 lockname: 鎖標識 identifier: 鎖的value值,用來校驗 ''' if rs.get(lockname).decode() == identifier: # 防止其他進程同名鎖被誤刪 rs.delete(lockname) return True # 刪除鎖 else: return False # 刪除失敗 #有過期時間的鎖 def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10): ''' rs: 連接對象 lock_name: 鎖標識 acquire_time: 過期超時時間 locked_time: 鎖的有效時間 return -> False 獲鎖失敗 or True 獲鎖成功 ''' identifier = str(uuid.uuid4()) end = time.time() + expire_time while time.time() < end: # 當獲取鎖的行為超過有效時間,則退出循環,本次取鎖失敗,返回False if rs.setnx(lock_name, identifier): # 嘗試取得鎖 # print('鎖已設置: %s' % identifier) rs.expire(lock_name, locked_time) return identifier time.sleep(.001) return False '''在業務函數中使用上面的鎖''' def sale(rs): start = time.time() # 程序啟動時間 with rs.pipeline() as p: ''' 通過管道方式進行連接 多條命令執行結束,一次性獲取結果 ''' while 1: lock = acquire_lock(rs, 'lock') if not lock: # 持鎖失敗 continue #開始監測"lock" p.watch("lock") try: #開啟事務 p.multi() count = int(rs.get('apple')) # 取量 p.set('apple', count-1) # 減量 # time.sleep(5) #提交事務 p.execute() print('當前庫存量: %s' % count) #成功則跳出循環 break except: #事務失敗對應處理 print("修改數據失敗") #無論成功與否最終都需要釋放鎖 finally: res = release_lock(rs, 'lock', lock) #釋放鎖成功, if res: print("刪除鎖成功") #釋放鎖失敗,強制刪除 else: print("刪除鎖失敗,強制刪除鎖") res = rs.delete('lock') print(res) print('[time]: %.2f' % (time.time() - start)) rs = redis.Redis(host='127.0.0.1', port=6379) # 連接redis # rs.set('apple',1000) # # 首先在redis中設置某商品apple 對應數量value值為1000 sale(rs)
優化鎖無法釋放的問題,為鎖添加過期時間
def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10): ''' rs: 連接對象 lock_name: 鎖標識 acquire_time: 過期超時時間 locked_time: 鎖的有效時間 return -> False 獲鎖失敗 or True 獲鎖成功 ''' identifier = str(uuid.uuid4()) end = time.time() + expire_time while time.time() < end: # 當獲取鎖的行為超過有效時間,則退出循環,本次取鎖失敗,返回False if rs.setnx(lock_name, identifier): # 嘗試取得鎖 # print('鎖已設置: %s' % identifier) rs.expire(lock_name, locked_time) return identifier time.sleep(.001) return False