用redis解決訂單超發問題的4種方法
# -*- coding: utf-8 -*- # 300~5000人搶100張票,保證不超發 import redis import time import threading from redis import WatchError from redis_lock import synchronized REDIS_DATABASE = { 'HOST': 'localhost', 'PORT': 6379, 'DB': 0 } TICKET_NUM = 100 # 票數 PEOPLE_NUM = 3000 # 人數 rds = redis.Redis(host=REDIS_DATABASE['HOST'], port=REDIS_DATABASE['PORT'], db=REDIS_DATABASE['DB']) rds.delete('ticket_num') rst = rds.incr('ticket_num', amount=TICKET_NUM) rds.delete('tickets') values = ['' for _ in xrange(TICKET_NUM)] tickets = rds.lpush('tickets', *values) class TestRedis(threading.Thread): def __init__(self, t_num): self.t_num = t_num super(TestRedis, self).__init__() def run(self): # self.error_examples() # 錯誤示范,多線程下會超發 # self.optimistic_lock() # 利用redis自帶事務(樂觀鎖) # self.pessimistic_lock # 自實現的悲觀鎖,比樂觀鎖快一丟丟 # self.redis_list() # 利用redis單線程特性,隊列操作 self.redis_incr() # 推薦方法!利用redis單線程特性,計數器操作 def error_examples(self): """ 錯誤示范,多線程下會超發 :return: """ ticket_num = int(rds.get('ticket_num')) time.sleep(0.1) # 加上sleep效果更明顯 if ticket_num > 0: print('t_num=%s, ticket_num=%s' % (self.t_num, ticket_num)) rds.set('ticket_num', ticket_num-1) def optimistic_lock(self): """ 樂觀鎖 :return: """ while 1: with rds.pipeline(transaction=True) as r_pip: r_pip.watch('ticket_num') try: r_pip.multi() ticket_num = int(rds.get('ticket_num')) if ticket_num > 0: r_pip.decr('ticket_num') r_pip.execute() return except WatchError: r_pip.unwatch() @synchronized(rds, "lock", 1000) def pessimistic_lock(self): """ 悲觀鎖 :return: """ ticket_num = int(rds.get('ticket_num')) if ticket_num > 0: rds.decr('ticket_num') def redis_list(self): """ 減列表方式,防止超發。利用redis單線程特性 缺點:消耗內存 :return: """ ticket = rds.lpop('tickets') if ticket is not None: rds.decr('ticket_num') def redis_incr(self): """ 利用redis單線程特性。 :return: """ time.sleep(0.1) if int(rds.get('ticket_num')) > 0: de_num = rds.decr('ticket_num') # 當只剩最后一張票時,多個線程都取到1,同時減后會成負數即“超發” if de_num < 0: # “超發”后補回。不繼續操作 print('Overshoot callback %s' % de_num) rds.incr('ticket_num') tests = [] for i in xrange(PEOPLE_NUM): t = TestRedis(i+1) tests.append(t) s = time.time() for t in tests: t.start() for t in tests: t.join() print('result ticket_num=%s, time=%s' % (rds.get('ticket_num'), (time.time()-s)*1000))
redis_lock.py
# coding: utf-8 """ redis 分布式悲觀鎖,需要解決以下幾個問題 1、A獲取鎖后崩潰,需要能將鎖釋放 2、A獲取鎖后處理時間過長,導致鎖過期,被B獲取,A處理完后錯誤的將B鎖釋放 redis.Redis()會有些問題,連接最好使用redis.StrictRedis() """ import math import time import uuid from contextlib import contextmanager from functools import wraps from redis import WatchError def acquire_lock(conn, lock_name, acquire_timeout=1, lock_timeout=1): """ 獲取鎖 :param conn: redis連接 :param lock_name: 鎖名稱 :param acquire_timeout: 獲取鎖最長等待時間,-1為永久阻塞等待 :param lock_timeout: 鎖超時時間 :return: """ def should_acquire(): if acquire_timeout == -1: return True acquire_end = time.time() + acquire_timeout return time.time() < acquire_end identity = str(uuid.uuid1()) lock_timeout = int(math.ceil(lock_timeout)) while should_acquire(): if conn.set(lock_name, identity, ex=lock_timeout, nx=True): return identity else: pttl = conn.pttl(lock_name) # Redis or StrictRedis # 如果使用的是Redis , 可能會存在pttl為0 但是顯示為None的情況 if pttl is None or pttl == -1: conn.expire(lock_name, lock_timeout) time.sleep(.1) return None def release_lock(conn, lock_name, identity): pipe = conn.pipeline(True) while True: try: pipe.watch(lock_name) if pipe.get(lock_name) == identity: pipe.delete(lock_name) return True pipe.unwatch() break except WatchError: pass return False @contextmanager def lock(conn, lock_name, lock_timeout): """ with lock(conn, "lock", 10): do something """ id_ = None try: id_ = acquire_lock(conn, lock_name, -1, lock_timeout) yield id_ finally: release_lock(conn, lock_name, id_) def synchronized(conn, lock_name, lock_timeout): """ @synchronized(conn, "lock", 10) def fun(): counter = int(r.get("counter")) counter += 1 r.set("counter", counter) """ def decorator(func): @wraps(func) def wrap(*args, **kwargs): with lock(conn, lock_name, lock_timeout): return func(*args, **kwargs) return wrap return decorator if __name__ == '__main__': import redis r = redis.Redis("localhost", db=5) id_ = acquire_lock(r, "lock", acquire_timeout=1, lock_timeout=10) release_lock(r, "lock", id_) with lock(r, "lock", 1): print("do something") @synchronized(r, "lock", 10) def fun(): counter = int(r.get("counter")) counter += 1 r.set("counter", counter) for i in range(10000): fun()