什么是緩存(cache):
在項目中沒有必要每次請求都查詢數據庫的情況就可以使用緩存,讓每次請求先查詢緩存,如果命中,就直接返回緩存結果,如果沒有命中,就查詢數據庫, 並將查詢結果放入緩存,下次請求時查詢緩存命中,直接返回結果,就不用再次查詢數據庫。
緩存的作用?
緩和較慢存儲的高頻請求,緩解數據庫壓力,提升響應速率。
為什么緩存可以提高響應速度?
因為緩存時基於內存的存儲的,內存的讀寫速率是普通SSD硬盤的至少十倍,更何況機械硬盤了:看對比圖

緩存介質?
web項目中常用的緩存是memcached和redis,它們都支持分布式存儲
緩存一定能給項目響應速率帶來較大提升嗎?
答案是不見得,要根據項目實際情況分析,有沒有使用緩存的不要。在考慮使用緩存前,不妨先問問自己:
1. 項目的讀寫操作比例為多少,如果是寫多讀少,那緩存真的不一定能幫助你,此時不妨考慮數據庫分庫分表,然后做MySQL的分布式集群,或者簡單直接,將硬盤全部替換為SSD(如果你的公司財大氣粗),反之,以讀為主的項目就比較適合加緩存了
2. 項目的訪問頻率高不高(用戶多不多)?如果用戶區區幾千人或幾萬人,全然沒有必要使用緩存,這點訪問量經過網絡后幾乎不會造成並發,即使偶出現幾萬的並發,MySQL也是扛得住的,強行使用緩存反而會增加代碼復雜度,甚至不容易維護,得不償失。
3. 數據是否要求強一致性?如果項目涉及到金錢或者重要數據,且數據頻繁發生變化,不允許存在一點差異,那是否使用緩存就要慎重慎重再慎重!因為緩存適用的是對數據一致性不是特別高的項目,如果使用,需要對緩存的設計有很好的方案,非常考驗技術功底
說了這么多,進入正題吧,我們通過代碼來模擬一下緩存的使用:
redis版本:
1 #!/usr/bin/python 2 # -*- coding: UTF-8 -*- 3 import functools 4 import redis 5 import time 6 import json 7 """ 8 使用redis做緩存,這里模擬一個web接口緩存的例子 9 """ 10 11 # 這里使用redis連接池,管理redisservice的所有連接,避免每次創建關閉連接的開銷 12 pool = redis.ConnectionPool(host='127.0.0.1', port=6379) 13 redis_cli = redis.Redis(connection_pool=pool) 14 15 def redis_cache(func): 16 @functools.wraps(func) # 為了保留原函數的屬性,因為被裝飾的函數對外暴露的是裝飾器的屬性 17 def wrapper(*args,**kargs): 18 start_time = time.time() 19 _key = 'function-name:{},args:{},kargs:{}'.format(func.__name__,args,kargs) #定義key的形式:函數名加與參數組成唯一的key 20 result = redis_cli.get(_key) 21 if result: # redis查找到對應的key,直接返回結果 22 result = json.loads(result) 23 print(type(result)) 24 print('redis find:{},result:{}'.format(_key,result)) 25 else: # redis沒有查找到對應key,查詢執行函數,查詢mysql 26 print('redis not find:{}'.format(_key)) 27 result = func(*args,**kargs) 28 redis_cli.setex(_key,json.dumps(result),5) #將mysql結果寫入redis,並設置過期時間 單位s 29 print("final result:{}".format(result)) 30 end_time = time.time()-start_time 31 print("Total time of this query:{}".format(end_time)) 32 return result 33 return wrapper 34 35 36 @redis_cache 37 def mysql_dispose(name,age): 38 time.sleep(2) 39 result = {'name:':name,'age':age} 40 print('mysql-result:{}'.format(result)) 41 return(result) 42 43 44 if __name__ == '__main__': 45 mysql_dispose('zz3',45) 46 47 48 out-put>>>: 49 第一次執行: 50 redis not find:function-name:mysql_dispose,args:('zz3', 45),kargs:{} 51 mysql-result:{'name:': 'zz3', 'age': 45} 52 final result:{'name:': 'zz3', 'age': 45} 53 Total time of this query:2.0049448013305664 54 55 第二次執行(距第一次5秒內執行): 56 <class 'dict'> 57 redis find:function-name:mysql_dispose,args:('zz3', 45),kargs:{},result:{'name:': 'zz3', 'age': 45} 58 Total time of this query:0.005013942718505859 59 60 第三次執行(5秒后)因為redis key過期被刪除,所以無法命中,請求會再次查詢數據庫,然后添加緩存: 61 redis not find:function-name:mysql_dispose,args:('zz3', 45),kargs:{} 62 mysql-result:{'name:': 'zz3', 'age': 45} 63 final result:{'name:': 'zz3', 'age': 45} 64 Total time of this query:2.0038458017378002
不難看出,原本需要2秒才能完成的數據庫查詢動作,再有了redis緩存后可以直接返回結果,提高了響應速率
memcached版本(與上面代碼大同小異,但是會有坑,注意紅色標記部分)
#!/usr/bin/python # -*- coding: UTF-8 -*- import functools #要使用這個庫,需要先安裝:pip install Python-memcached import memcacheimport time import json """ 使用memcache做緩存,這里模擬一個web接口緩存的例子 """ # 連接到memcached服務器 conn = memcache.Client(['localhost:11211']) def redis_cache(func): @functools.wraps(func) # 為了保留原函數的屬性,因為被裝飾的函數對外暴露的是裝飾器的屬性 def wrapper(*args,**kargs): start_time = time.time() _key = 'function-name:{},args:{},kargs:{}'.format(func.__name__,args,kargs) #定義key的形式 result = conn.get(_key) if result: # memcached查找到對應的key,直接返回結果 result = json.loads(result) print(type(result)) print('memcached find:{},result:{}'.format(_key,result)) else: # memcached沒有查找到對應key,查詢執行函數,查詢mysql print('memcached not find:{}'.format(_key)) result = func(*args,**kargs) conn.set(_key,json.dumps(result),5) #將mysql結果寫入memcached,並設置過期時間 單位s print("final result:{}".format(result)) end_time = time.time()-start_time print("Total time of this query:{}".format(end_time)) return result return wrapper @redis_cache def mysql_dispose(name,age): time.sleep(2) result = {'name:':name,'age':age} print('mysql-result:{}'.format(result)) return(result) if __name__ == '__main__': mysql_dispose('zz3',45)
當我將redis換為memcache后,運行發現居然報錯了!:
Traceback (most recent call last): File "memcache_cache.py", line 50, in <module> mysql_dispose('zz3',45) File "memcache_cache.py", line 25, in wrapper result = conn.get(_key) File "C:\Users\sys_syscafhost\AppData\Local\Programs\Python\Python36\lib\site-packages\memcache.py", line 888, in get return self._get('get', key) File "C:\Users\sys_syscafhost\AppData\Local\Programs\Python\Python36\lib\site-packages\memcache.py", line 837, in _get self.check_key(key) File "C:\Users\sys_syscafhost\AppData\Local\Programs\Python\Python36\lib\site-packages\memcache.py", line 1053, in check_key "Control characters not allowed") memcache.MemcachedKeyCharacterError: Control characters not allowed
說明我們的key有空格字符,what?redis版本中一點問題沒有,這里卻說不允許,先打印出來看看:
_key : function-name:mysql_dispose,args:('zz3', 45),kargs:{}
原來是'zz3',和45之間有空格,'所以大家要注意,memcached key不能包含空格,但是這是python args自動解析填寫的,怎么辦?
算了,一種是對_key處理,去掉空格,但是感覺麻煩且別扭,索性取_keyde MD5的結果,這里用到標准庫:hashlib
代碼修改后為:
#!/usr/bin/python # -*- coding: UTF-8 -*- import functools #要使用這個庫,需要先安裝:pip install Python-memcached import memcache import hashlib import time import json """ 使用memcache做緩存,這里模擬一個web接口緩存的例子 """ # 連接到memcached服務器 conn = memcache.Client(['localhost:11211']) def redis_cache(func): @functools.wraps(func) # 為了保留原函數的屬性,因為被裝飾的函數對外暴露的是裝飾器的屬性 def wrapper(*args,**kargs): start_time = time.time() key = 'function-name:{},args:{},kargs:{}'.format(func.__name__,args,kargs) #定義key的形式 hash_obj = hashlib.md5() hash_obj.update(key.encode(encoding='utf-8')) _key_hash = hash_obj.hexdigest() result = conn.get(_key_hash) if result: # memcached查找到對應的key,直接返回結果 result = json.loads(result) print(type(result)) print('memcached find:{},result:{}'.format(_key_hash,result)) else: # memcached沒有查找到對應key,查詢執行函數,查詢mysql print('memcached not find:{}'.format(_key_hash)) result = func(*args,**kargs) conn.set(_key_hash,json.dumps(result),5) #將mysql結果寫入memcached,並設置過期時間 單位s print("final result:{}".format(result)) end_time = time.time()-start_time print("Total time of this query:{}".format(end_time)) return result return wrapper @redis_cache def mysql_dispose(name,age): time.sleep(2) result = {'name:':name,'age':age} print('mysql-result:{}'.format(result)) return(result) if __name__ == '__main__': mysql_dispose('zz3',45)
#這下正常運行了,output>>>
第一次運行:
memcached not find:1477b8c668df1f570293f4c374963638
mysql-result:{'name:': 'zz3', 'age': 45}
final result:{'name:': 'zz3', 'age': 45}
Total time of this query:2.005075693130493
第二次運行(5s內):
<class 'dict'>
memcached find:1477b8c668df1f570293f4c374963638,result:{'name:': 'zz3', 'age': 45}
final result:{'name:': 'zz3', 'age': 45}
Total time of this query:0.004010915756225586
第三次(5s后):
memcached not find:1477b8c668df1f570293f4c374963638
mysql-result:{'name:': 'zz3', 'age': 45}
final result:{'name:': 'zz3', 'age': 45}
Total time of this query:2.015402317047119
接下來就是項目中使用緩存不得不考慮的問題了,也是緩存的三大問題:
1.緩存穿透,
2.緩存過期(擊穿),
3.緩存雪崩
這里用一張表(students)做描述:
s_id, s_name, s_birth, s_sex
01趙雷1990-01-01男
02錢電1990-12-21男
03孫風1990-05-20男
04李雲1990-08-06男
05周梅1991-12-01女
06吳蘭1992-03-01女
07鄭竹1989-07-01女
08王菊1990-01-20女
緩存穿透:
也就是指查詢目標不在緩存,也不在數據庫中存在:比如:某個用戶請求查詢name=‘夏雨’的記錄:
先會在redis cache中查找,沒有值,再查數據庫中也找不到結果,這就造成了數據會穿透redis cache而每次命中mysql

試想:如果有用戶惡意攻擊怎么辦?故意發起10W次並發請求怎么辦?所有的請求都會穿過redis cache而次次命中mysql,對mysql造成了非常大的壓力,甚至可能會出現宕機.
怎么辦呢?一般來說,兩種處理方式:
1. redis cache緩存空數據,也就是,即使mysql未查詢到結果,也將這個請求與參數保存在redis,key=xxxxxxnameis夏雨,vaule=None,下次同樣的請求直接redis返回None,不再查詢數據庫
這樣的方法比較挫,因為雖然省事,但是勢必會在緩存中增加很多無用的信息,這種時間最好設置key的過期時間,使無用的key在一段短時間內自動刪除,當然,這種方法適用於無效key較少的情況使用
2. 使用BloomFilter(布隆過濾器)
需要在redis cache之前再加一層,

當name=‘夏雨’的請求再次要求查詢時,先查詢BloomFilter中key是否存在,不存在就直接返回none,存在,再走redis查詢
緩存過期:
我們在設置緩存的key--value時,會設置一個過期時間,再有效期內,大量的查詢都會被redis攔截處理,並返回結果,但是,當大量請求持續查詢redis某個key時比如name=‘趙雷’,正好redis的這個key到了過期時間,被自動刪除了,那redis查不到了,大量的查詢請求就會到mysql這邊,而mysql給第一次請求查詢時用了2秒才返回,並寫入redis cache,那這2秒之內,redis還未重新更新數據,剩下的大量查詢還時指向了mysql,造成mysql的IO阻塞,數據庫可能宕機.,且這些查詢mysql后會重復寫redis。
解決方法:
1. 業務邏輯控制>>>從緩存拿數據時獲取剩余剩余有效時間,如果小於1s,就重新設置有效時間,流程和代碼如下:

代碼:我們對上述代碼做一個簡單修改:
def redis_cache(func): @functools.wraps(func) # 為了保留原函數的屬性,因為被裝飾的函數對外暴露的是裝飾器的屬性 def wrapper(*args,**kargs): start_time = time.time() _key = 'function-name:{},args:{},kargs:{}'.format(func.__name__,args,kargs) #定義key的形式 period_time = redis_cli.ttl(_key) if period_time: # 獲取key的過期時間,如果key不存在,redis存儲器會返回-2,我們python redis庫封裝后返回None if period_time <= 1: redis_cli.expire(_key,5) result = redis_cli.get(_key) # redis查找到對應的key,直接返回結果 result = json.loads(result) print(type(result)) print('redis find:{},result:{}'.format(_key,result)) else: # redis沒有查找到對應key,查詢執行函數,查詢mysql print('redis not find:{}'.format(_key)) result = func(*args,**kargs) redis_cli.setex(_key,json.dumps(result),5) #將mysql結果寫入redis,並設置過期時間 單位s print("final result:{}".format(result)) end_time = time.time()-start_time print("Total time of this query:{}".format(end_time)) return result return wrapper
2. 加鎖
對於key失效的情況,如果有100個請求同時要求查詢:第一個請求發現緩存中沒有這個key,就上鎖,然后去查詢mysql數據庫,其他請求被阻塞,等第一個請求拿到mysql,並更新到緩存后,釋放鎖。阻塞的99個請求此時查詢緩存,全部命中,這樣就保護了mysql。
模擬這樣情況 代碼如下:
1 #!/usr/bin/python 2 # -*- coding: UTF-8 -*- 3 import functools 4 import redis 5 import time 6 import json 7 import threading 8 9 """ 10 使用redis做緩存,這里模擬一個web接口緩存的例子 11 """ 12 13 # 這里使用redis連接池,管理redisservice的所有連接,避免每次創建關閉連接的開銷 14 pool = redis.ConnectionPool(host='127.0.0.1', port=6379) 15 redis_cli = redis.Redis(connection_pool=pool) 16 17 18 def redis_cache(func): 19 @functools.wraps(func) # 為了保留原函數的屬性,因為被裝飾的函數對外暴露的是裝飾器的屬性 20 def wrapper(*args, **kargs): 21 start_time = time.time() 22 _key = 'function-name:{},args:{},kargs:{}'.format(func.__name__, args, kargs) # 定義key的形式 23 result = redis_cli.get(_key) # redis查找到對應的key,直接返回結果 24 if result: 25 result = json.loads(result) 26 print(type(result)) 27 print('redis find:{},result:{}'.format(_key, result)) 28 else: # redis沒有查找到對應key,查詢執行函數,查詢mysql 29 lock.acquire() #1 30 result = redis_cli.get(_key) #2 31 if result: #3 32 print('Value to be found in the redis·························') 33 else: #4 34 print('redis not find:{}, find from mysql·······················'.format(_key)) 35 result = func(*args, **kargs) 36 redis_cli.setex(_key, json.dumps(result), 5) # 將mysql結果寫入redis,並設置過期時間 單位s 37 lock.release() # 無論如何,都釋放鎖 38 print("final result:{}".format(result)) 39 end_time = time.time() - start_time 40 print("Total time of this query:{}".format(end_time)) 41 return result 42 43 return wrapper 44 45 46 @redis_cache 47 def mysql_dispose(name, age): 48 time.sleep(2) 49 result = {'name:': name, 'age': age} 50 print('mysql-result:{}'.format(result)) 51 return (result) 52 53 54 if __name__ == '__main__': 55 lock = threading.Lock() # 創建一個不可重入鎖的對象 56 57 # 創建5個線程模擬並發請求 58 t1 = threading.Thread(target=mysql_dispose, args=('zz3', 45,)) 59 t2 = threading.Thread(target=mysql_dispose, args=('zz3', 45,)) 60 t3 = threading.Thread(target=mysql_dispose, args=('zz3', 45,)) 61 t4 = threading.Thread(target=mysql_dispose, args=('zz3', 45,)) 62 t5 = threading.Thread(target=mysql_dispose, args=('zz3', 45,)) 63 64 t1.start() 65 t2.start() 66 t3.start() 67 t4.start() 68 t5.start()
output>>>>
redis not find:function-name:mysql_dispose,args:('zz3', 45),kargs:{}, find from mysql·······················
mysql-result:{'name:': 'zz3', 'age': 45}
final result:{'name:': 'zz3', 'age': 45}
Value to be found in the redis ·······················
final result:b'{"name:": "zz3", "age": 45}'
Total time of this query:2.0075249671936035
Total time of this query:2.0085232257843018
Value to be found in the redis ·······················
final result:b'{"name:": "zz3", "age": 45}'
Value to be found in the redis ·······················
final result:b'{"name:": "zz3", "age": 45}'
Total time of this query:2.0105319023132324
Total time of this query:2.0105326175689697
Value to be found in the redis ·······················
final result:b'{"name:": "zz3", "age": 45}'
Total time of this query:2.0155487060546875
做了哪些改動?看標紅的4步
1. 第一個查詢沒有從redis中查到結果就上鎖,
2.再次查詢redis,結果還是沒找到,
3.那就查詢mysql,mysql耗時2秒的查詢過程中,鎖未釋放,而此時,第二個線程第一次查詢redis(第23行),結果不存在,就走到else分支,去上鎖,發現鎖未被第一個線程釋放,那就阻塞,其他線程也一樣的道理,都阻塞
4. 第一個線程查詢完mysql,將結果更新到redis,然后釋放鎖,第二個線程發現鎖被釋放,立刻上鎖,到第2步,再次查詢redis,發現這時redis的key-value存在了,那就直接拿到結果,然后釋放鎖,不會再查詢mysql,其他線程也是這樣的過程
不難看出。這樣會導致大量的請求阻塞,但是會保護mysql,至於取舍,就看項目需求了
緩存雪崩
雪崩就好理解了,一句話,就是redis掛掉了或者redis的key在同一時刻全部過期了,那所有的請求就會全部查詢數據庫,可想而知,如果同一時刻有大量請求,那對數據庫造成的壓力也是非同小可
怎么辦?
1.先考慮redis key過期的情況,既然為了防止同時過期,那就可以考慮給key設置過期時間時加上一個隨機數,比如,key1的過期時間是600s,隨機數為6,那key1的過期時間就是606s,為什么不直接不設置過期時間,使key永不過期呢?嗯。。。。。因為內存空間實在有限且珍貴,很多key在某個時間段后其實就沒有了價值,應為用戶查詢少或者完全不會再次查詢,這樣的key如果無限制存在於內存中簡直是災難,內存遲早會被吃空!代碼稍加修改,在設置過期時間是加上隨機數:
import random
random_num = random.randint(1,9) redis_cli.setex(_key, json.dumps(result), 5+random_num) # 將mysql結果寫入redis,並設置過期時間 單位s
2. redis掛掉的解決方法:
試想,redis是單節點的時候掛掉,就會出現這種情況,那就應該考慮redis分布式集群,保證至少有一個redis活着,redis的分布式集群這里就不做說明了,可以自己網上搜索
那要是不幸,整個redis集群都掛掉了怎么辦?那就應該限流,常見的限流算法有:計數器、漏桶和令牌桶算法。可以自己去了解一下
